X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/67464732a18e0aac11d89a75987151543b704cf6..17479bd75a29c52470abe0049cb447e114eb39e9:/lib/dispatchcloud/scheduler/fix_stale_locks.go diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go index e9644aed21..985941090d 100644 --- a/lib/dispatchcloud/scheduler/fix_stale_locks.go +++ b/lib/dispatchcloud/scheduler/fix_stale_locks.go @@ -9,17 +9,17 @@ import ( "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" ) -// FixStaleLocks waits for any already-locked containers (i.e., locked +// fixStaleLocks waits for any already-locked containers (i.e., locked // by a prior dispatcher process) to appear on workers as the worker // pool recovers its state. It unlocks any that still remain when all -// workers are recovered or shutdown, or its timer expires. -func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) { - wp := pool.Subscribe() - defer pool.Unsubscribe(wp) - timeout := time.NewTimer(limit) +// workers are recovered or shutdown, or its timer +// (sch.staleLockTimeout) expires. +func (sch *Scheduler) fixStaleLocks() { + wp := sch.pool.Subscribe() + defer sch.pool.Unsubscribe(wp) + timeout := time.NewTimer(sch.staleLockTimeout) waiting: for { unlock := false @@ -28,15 +28,15 @@ waiting: // If all workers have been contacted, unlock // containers that aren't claimed by any // worker. - unlock = pool.Workers()[worker.StateUnknown] == 0 + unlock = sch.pool.Workers()[worker.StateUnknown] == 0 case <-timeout.C: // Give up and unlock the containers, even // though they might be working. unlock = true } - running := pool.Running() - qEntries, _ := queue.Entries() + running := sch.pool.Running() + qEntries, _ := sch.queue.Entries() for uuid, ent := range qEntries { if ent.Container.State != arvados.ContainerStateLocked { continue @@ -47,9 +47,9 @@ waiting: if !unlock { continue waiting } - err := queue.Unlock(uuid) + err := sch.queue.Unlock(uuid) if err != nil { - logger.Warnf("Unlock %s: %s", uuid, err) + sch.logger.Warnf("Unlock %s: %s", uuid, err) } } return