X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3c538444c15e68e96780f157935f2baa4ba0bc5..0a6c326074bdba18a13428f4580a313a6b5d7687:/lib/dispatchcloud/scheduler/fix_stale_locks.go diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go index e9644aed21..dbd8b609a9 100644 --- a/lib/dispatchcloud/scheduler/fix_stale_locks.go +++ b/lib/dispatchcloud/scheduler/fix_stale_locks.go @@ -7,36 +7,27 @@ package scheduler import ( "time" - "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" + "git.arvados.org/arvados.git/lib/dispatchcloud/worker" + "git.arvados.org/arvados.git/sdk/go/arvados" ) -// FixStaleLocks waits for any already-locked containers (i.e., locked +// fixStaleLocks waits for any already-locked containers (i.e., locked // by a prior dispatcher process) to appear on workers as the worker // pool recovers its state. It unlocks any that still remain when all -// workers are recovered or shutdown, or its timer expires. -func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) { - wp := pool.Subscribe() - defer pool.Unsubscribe(wp) - timeout := time.NewTimer(limit) +// workers are recovered or shutdown, or its timer +// (sch.staleLockTimeout) expires. +func (sch *Scheduler) fixStaleLocks() { + wp := sch.pool.Subscribe() + defer sch.pool.Unsubscribe(wp) + + var stale []string + timeout := time.NewTimer(sch.staleLockTimeout) waiting: - for { - unlock := false - select { - case <-wp: - // If all workers have been contacted, unlock - // containers that aren't claimed by any - // worker. - unlock = pool.Workers()[worker.StateUnknown] == 0 - case <-timeout.C: - // Give up and unlock the containers, even - // though they might be working. - unlock = true - } + for sch.pool.CountWorkers()[worker.StateUnknown] > 0 { + running := sch.pool.Running() + qEntries, _ := sch.queue.Entries() - running := pool.Running() - qEntries, _ := queue.Entries() + stale = nil for uuid, ent := range qEntries { if ent.Container.State != arvados.ContainerStateLocked { continue @@ -44,14 +35,24 @@ waiting: if _, running := running[uuid]; running { continue } - if !unlock { - continue waiting - } - err := queue.Unlock(uuid) - if err != nil { - logger.Warnf("Unlock %s: %s", uuid, err) - } + stale = append(stale, uuid) + } + if len(stale) == 0 { + return + } + + select { + case <-wp: + case <-timeout.C: + // Give up. + break waiting + } + } + + for _, uuid := range stale { + err := sch.queue.Unlock(uuid) + if err != nil { + sch.logger.Warnf("Unlock %s: %s", uuid, err) } - return } }