14360: Initial version of dispatch-cloud.
[arvados.git] / lib / dispatchcloud / scheduler / fix_stale_locks.go
diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go
new file mode 100644 (file)
index 0000000..e9644ae
--- /dev/null
@@ -0,0 +1,57 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// FixStaleLocks waits for any already-locked containers (i.e., locked
+// by a prior dispatcher process) to appear on workers as the worker
+// pool recovers its state. It unlocks any that still remain when all
+// workers are recovered or shutdown, or its timer expires.
+func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) {
+       wp := pool.Subscribe()
+       defer pool.Unsubscribe(wp)
+       timeout := time.NewTimer(limit)
+waiting:
+       for {
+               unlock := false
+               select {
+               case <-wp:
+                       // If all workers have been contacted, unlock
+                       // containers that aren't claimed by any
+                       // worker.
+                       unlock = pool.Workers()[worker.StateUnknown] == 0
+               case <-timeout.C:
+                       // Give up and unlock the containers, even
+                       // though they might be working.
+                       unlock = true
+               }
+
+               running := pool.Running()
+               qEntries, _ := queue.Entries()
+               for uuid, ent := range qEntries {
+                       if ent.Container.State != arvados.ContainerStateLocked {
+                               continue
+                       }
+                       if _, running := running[uuid]; running {
+                               continue
+                       }
+                       if !unlock {
+                               continue waiting
+                       }
+                       err := queue.Unlock(uuid)
+                       if err != nil {
+                               logger.Warnf("Unlock %s: %s", uuid, err)
+                       }
+               }
+               return
+       }
+}