Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / lib / dispatchcloud / scheduler / fix_stale_locks.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "time"
9
10         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12 )
13
14 // fixStaleLocks waits for any already-locked containers (i.e., locked
15 // by a prior dispatcher process) to appear on workers as the worker
16 // pool recovers its state. It unlocks any that still remain when all
17 // workers are recovered or shutdown, or its timer
18 // (sch.staleLockTimeout) expires.
19 func (sch *Scheduler) fixStaleLocks() {
20         wp := sch.pool.Subscribe()
21         defer sch.pool.Unsubscribe(wp)
22
23         var stale []string
24         timeout := time.NewTimer(sch.staleLockTimeout)
25 waiting:
26         for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
27                 running := sch.pool.Running()
28                 qEntries, _ := sch.queue.Entries()
29
30                 stale = nil
31                 for uuid, ent := range qEntries {
32                         if ent.Container.State != arvados.ContainerStateLocked {
33                                 continue
34                         }
35                         if _, running := running[uuid]; running {
36                                 continue
37                         }
38                         stale = append(stale, uuid)
39                 }
40                 if len(stale) == 0 {
41                         return
42                 }
43
44                 select {
45                 case <-wp:
46                 case <-timeout.C:
47                         // Give up.
48                         break waiting
49                 }
50         }
51
52         for _, uuid := range stale {
53                 err := sch.queue.Unlock(uuid)
54                 if err != nil {
55                         sch.logger.Warnf("Unlock %s: %s", uuid, err)
56                 }
57         }
58 }