14360: Encapsulate scheduler object.
[arvados.git] / lib / dispatchcloud / scheduler / fix_stale_locks.go
index e9644aed21e6ec2af0374fa8e3c08906eff0b12b..985941090d7197c5d7e202c5aad7476f2064a763 100644 (file)
@@ -9,17 +9,17 @@ import (
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
 )
 
-// FixStaleLocks waits for any already-locked containers (i.e., locked
+// fixStaleLocks waits for any already-locked containers (i.e., locked
 // by a prior dispatcher process) to appear on workers as the worker
 // pool recovers its state. It unlocks any that still remain when all
-// workers are recovered or shutdown, or its timer expires.
-func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) {
-       wp := pool.Subscribe()
-       defer pool.Unsubscribe(wp)
-       timeout := time.NewTimer(limit)
+// workers are recovered or shutdown, or its timer
+// (sch.staleLockTimeout) expires.
+func (sch *Scheduler) fixStaleLocks() {
+       wp := sch.pool.Subscribe()
+       defer sch.pool.Unsubscribe(wp)
+       timeout := time.NewTimer(sch.staleLockTimeout)
 waiting:
        for {
                unlock := false
@@ -28,15 +28,15 @@ waiting:
                        // If all workers have been contacted, unlock
                        // containers that aren't claimed by any
                        // worker.
-                       unlock = pool.Workers()[worker.StateUnknown] == 0
+                       unlock = sch.pool.Workers()[worker.StateUnknown] == 0
                case <-timeout.C:
                        // Give up and unlock the containers, even
                        // though they might be working.
                        unlock = true
                }
 
-               running := pool.Running()
-               qEntries, _ := queue.Entries()
+               running := sch.pool.Running()
+               qEntries, _ := sch.queue.Entries()
                for uuid, ent := range qEntries {
                        if ent.Container.State != arvados.ContainerStateLocked {
                                continue
@@ -47,9 +47,9 @@ waiting:
                        if !unlock {
                                continue waiting
                        }
-                       err := queue.Unlock(uuid)
+                       err := sch.queue.Unlock(uuid)
                        if err != nil {
-                               logger.Warnf("Unlock %s: %s", uuid, err)
+                               sch.logger.Warnf("Unlock %s: %s", uuid, err)
                        }
                }
                return