14807: Escalate sigterm->sigkill->drain instance.
[arvados.git] / lib / dispatchcloud / scheduler / fix_stale_locks.go
index e9644aed21e6ec2af0374fa8e3c08906eff0b12b..1f9338f7b8d02e66d73239cc0f0c76ebc53ab423 100644 (file)
@@ -9,34 +9,25 @@ import (
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
 )
 
-// FixStaleLocks waits for any already-locked containers (i.e., locked
+// fixStaleLocks waits for any already-locked containers (i.e., locked
 // by a prior dispatcher process) to appear on workers as the worker
 // pool recovers its state. It unlocks any that still remain when all
-// workers are recovered or shutdown, or its timer expires.
-func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) {
-       wp := pool.Subscribe()
-       defer pool.Unsubscribe(wp)
-       timeout := time.NewTimer(limit)
+// workers are recovered or shutdown, or its timer
+// (sch.staleLockTimeout) expires.
+func (sch *Scheduler) fixStaleLocks() {
+       wp := sch.pool.Subscribe()
+       defer sch.pool.Unsubscribe(wp)
+
+       var stale []string
+       timeout := time.NewTimer(sch.staleLockTimeout)
 waiting:
-       for {
-               unlock := false
-               select {
-               case <-wp:
-                       // If all workers have been contacted, unlock
-                       // containers that aren't claimed by any
-                       // worker.
-                       unlock = pool.Workers()[worker.StateUnknown] == 0
-               case <-timeout.C:
-                       // Give up and unlock the containers, even
-                       // though they might be working.
-                       unlock = true
-               }
+       for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
+               running := sch.pool.Running()
+               qEntries, _ := sch.queue.Entries()
 
-               running := pool.Running()
-               qEntries, _ := queue.Entries()
+               stale = nil
                for uuid, ent := range qEntries {
                        if ent.Container.State != arvados.ContainerStateLocked {
                                continue
@@ -44,14 +35,24 @@ waiting:
                        if _, running := running[uuid]; running {
                                continue
                        }
-                       if !unlock {
-                               continue waiting
-                       }
-                       err := queue.Unlock(uuid)
-                       if err != nil {
-                               logger.Warnf("Unlock %s: %s", uuid, err)
-                       }
+                       stale = append(stale, uuid)
+               }
+               if len(stale) == 0 {
+                       return
+               }
+
+               select {
+               case <-wp:
+               case <-timeout.C:
+                       // Give up.
+                       break waiting
+               }
+       }
+
+       for _, uuid := range stale {
+               err := sch.queue.Unlock(uuid)
+               if err != nil {
+                       sch.logger.Warnf("Unlock %s: %s", uuid, err)
                }
-               return
        }
 }