"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
)
-// FixStaleLocks waits for any already-locked containers (i.e., locked
+// fixStaleLocks waits for any already-locked containers (i.e., locked
// by a prior dispatcher process) to appear on workers as the worker
// pool recovers its state. It unlocks any that still remain when all
-// workers are recovered or shutdown, or its timer expires.
-func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) {
- wp := pool.Subscribe()
- defer pool.Unsubscribe(wp)
- timeout := time.NewTimer(limit)
+// workers are recovered or shutdown, or its timer
+// (sch.staleLockTimeout) expires.
+func (sch *Scheduler) fixStaleLocks() {
+ wp := sch.pool.Subscribe()
+ defer sch.pool.Unsubscribe(wp)
+ timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
for {
unlock := false
// If all workers have been contacted, unlock
// containers that aren't claimed by any
// worker.
- unlock = pool.Workers()[worker.StateUnknown] == 0
+ unlock = sch.pool.Workers()[worker.StateUnknown] == 0
case <-timeout.C:
// Give up and unlock the containers, even
// though they might be working.
unlock = true
}
- running := pool.Running()
- qEntries, _ := queue.Entries()
+ running := sch.pool.Running()
+ qEntries, _ := sch.queue.Entries()
for uuid, ent := range qEntries {
if ent.Container.State != arvados.ContainerStateLocked {
continue
if !unlock {
continue waiting
}
- err := queue.Unlock(uuid)
+ err := sch.queue.Unlock(uuid)
if err != nil {
- logger.Warnf("Unlock %s: %s", uuid, err)
+ sch.logger.Warnf("Unlock %s: %s", uuid, err)
}
}
return