14360: Encapsulate scheduler object.
[arvados.git] / lib / dispatchcloud / scheduler / fix_stale_locks.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "time"
9
10         "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12 )
13
14 // fixStaleLocks waits for any already-locked containers (i.e., locked
15 // by a prior dispatcher process) to appear on workers as the worker
16 // pool recovers its state. It unlocks any that still remain when all
17 // workers are recovered or shutdown, or its timer
18 // (sch.staleLockTimeout) expires.
19 func (sch *Scheduler) fixStaleLocks() {
20         wp := sch.pool.Subscribe()
21         defer sch.pool.Unsubscribe(wp)
22         timeout := time.NewTimer(sch.staleLockTimeout)
23 waiting:
24         for {
25                 unlock := false
26                 select {
27                 case <-wp:
28                         // If all workers have been contacted, unlock
29                         // containers that aren't claimed by any
30                         // worker.
31                         unlock = sch.pool.Workers()[worker.StateUnknown] == 0
32                 case <-timeout.C:
33                         // Give up and unlock the containers, even
34                         // though they might be working.
35                         unlock = true
36                 }
37
38                 running := sch.pool.Running()
39                 qEntries, _ := sch.queue.Entries()
40                 for uuid, ent := range qEntries {
41                         if ent.Container.State != arvados.ContainerStateLocked {
42                                 continue
43                         }
44                         if _, running := running[uuid]; running {
45                                 continue
46                         }
47                         if !unlock {
48                                 continue waiting
49                         }
50                         err := sch.queue.Unlock(uuid)
51                         if err != nil {
52                                 sch.logger.Warnf("Unlock %s: %s", uuid, err)
53                         }
54                 }
55                 return
56         }
57 }