14807: Escalate sigterm->sigkill->drain instance.
[arvados.git] / lib / dispatchcloud / scheduler / fix_stale_locks.go
index 264f9e4ec6bbc3747401858a37c3f70b259116c1..1f9338f7b8d02e66d73239cc0f0c76ebc53ab423 100644 (file)
@@ -19,24 +19,15 @@ import (
 func (sch *Scheduler) fixStaleLocks() {
        wp := sch.pool.Subscribe()
        defer sch.pool.Unsubscribe(wp)
+
+       var stale []string
        timeout := time.NewTimer(sch.staleLockTimeout)
 waiting:
-       for {
-               unlock := false
-               select {
-               case <-wp:
-                       // If all workers have been contacted, unlock
-                       // containers that aren't claimed by any
-                       // worker.
-                       unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0
-               case <-timeout.C:
-                       // Give up and unlock the containers, even
-                       // though they might be working.
-                       unlock = true
-               }
-
+       for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
                running := sch.pool.Running()
                qEntries, _ := sch.queue.Entries()
+
+               stale = nil
                for uuid, ent := range qEntries {
                        if ent.Container.State != arvados.ContainerStateLocked {
                                continue
@@ -44,14 +35,24 @@ waiting:
                        if _, running := running[uuid]; running {
                                continue
                        }
-                       if !unlock {
-                               continue waiting
-                       }
-                       err := sch.queue.Unlock(uuid)
-                       if err != nil {
-                               sch.logger.Warnf("Unlock %s: %s", uuid, err)
-                       }
+                       stale = append(stale, uuid)
+               }
+               if len(stale) == 0 {
+                       return
+               }
+
+               select {
+               case <-wp:
+               case <-timeout.C:
+                       // Give up.
+                       break waiting
+               }
+       }
+
+       for _, uuid := range stale {
+               err := sch.queue.Unlock(uuid)
+               if err != nil {
+                       sch.logger.Warnf("Unlock %s: %s", uuid, err)
                }
-               return
        }
 }