1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
10 "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
11 "git.curoverse.com/arvados.git/sdk/go/arvados"
14 // fixStaleLocks waits for any already-locked containers (i.e., locked
15 // by a prior dispatcher process) to appear on workers as the worker
16 // pool recovers its state. It unlocks any that still remain when all
17 // workers are recovered or shutdown, or its timer
18 // (sch.staleLockTimeout) expires.
19 func (sch *Scheduler) fixStaleLocks() {
20 wp := sch.pool.Subscribe()
21 defer sch.pool.Unsubscribe(wp)
24 timeout := time.NewTimer(sch.staleLockTimeout)
26 for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
27 running := sch.pool.Running()
28 qEntries, _ := sch.queue.Entries()
31 for uuid, ent := range qEntries {
32 if ent.Container.State != arvados.ContainerStateLocked {
35 if _, running := running[uuid]; running {
38 stale = append(stale, uuid)
52 for _, uuid := range stale {
53 err := sch.queue.Unlock(uuid)
55 sch.logger.Warnf("Unlock %s: %s", uuid, err)