--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
+)
+
+// FixStaleLocks waits for any already-locked containers (i.e., locked
+// by a prior dispatcher process) to appear on workers as the worker
+// pool recovers its state. It unlocks any that still remain when all
+// workers are recovered or shutdown, or its timer expires.
+func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) {
+ wp := pool.Subscribe()
+ defer pool.Unsubscribe(wp)
+ timeout := time.NewTimer(limit)
+waiting:
+ for {
+ unlock := false
+ select {
+ case <-wp:
+ // If all workers have been contacted, unlock
+ // containers that aren't claimed by any
+ // worker.
+ unlock = pool.Workers()[worker.StateUnknown] == 0
+ case <-timeout.C:
+ // Give up and unlock the containers, even
+ // though they might be working.
+ unlock = true
+ }
+
+ running := pool.Running()
+ qEntries, _ := queue.Entries()
+ for uuid, ent := range qEntries {
+ if ent.Container.State != arvados.ContainerStateLocked {
+ continue
+ }
+ if _, running := running[uuid]; running {
+ continue
+ }
+ if !unlock {
+ continue waiting
+ }
+ err := queue.Unlock(uuid)
+ if err != nil {
+ logger.Warnf("Unlock %s: %s", uuid, err)
+ }
+ }
+ return
+ }
+}