Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "fmt"
9
10         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12         "github.com/sirupsen/logrus"
13 )
14
15 // sync resolves discrepancies between the queue and the pool:
16 //
17 // Lingering crunch-run processes for finalized and unlocked/requeued
18 // containers are killed.
19 //
20 // Locked containers whose crunch-run processes have exited are
21 // requeued.
22 //
23 // Running containers whose crunch-run processes have exited are
24 // cancelled.
25 func (sch *Scheduler) sync() {
26         running := sch.pool.Running()
27         qEntries, qUpdated := sch.queue.Entries()
28         for uuid, ent := range qEntries {
29                 exited, running := running[uuid]
30                 switch ent.Container.State {
31                 case arvados.ContainerStateRunning:
32                         if !running {
33                                 go sch.cancel(uuid, "not running on any worker")
34                         } else if !exited.IsZero() && qUpdated.After(exited) {
35                                 go sch.cancel(uuid, "state=Running after crunch-run exited")
36                         } else if ent.Container.Priority == 0 {
37                                 go sch.kill(uuid, "priority=0")
38                         }
39                 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
40                         if running {
41                                 // Kill crunch-run in case it's stuck;
42                                 // nothing it does now will matter
43                                 // anyway. If crunch-run has already
44                                 // exited and we just haven't found
45                                 // out about it yet, the only effect
46                                 // of kill() will be to make the
47                                 // worker available for the next
48                                 // container.
49                                 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
50                         } else {
51                                 sch.logger.WithFields(logrus.Fields{
52                                         "ContainerUUID": uuid,
53                                         "State":         ent.Container.State,
54                                 }).Info("container finished -- dropping from queue")
55                                 sch.queue.Forget(uuid)
56                         }
57                 case arvados.ContainerStateQueued:
58                         if running {
59                                 // Can happen if a worker returns from
60                                 // a network outage and is still
61                                 // preparing to run a container that
62                                 // has already been unlocked/requeued.
63                                 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
64                         } else if ent.Container.Priority == 0 {
65                                 sch.logger.WithFields(logrus.Fields{
66                                         "ContainerUUID": uuid,
67                                         "State":         ent.Container.State,
68                                         "Priority":      ent.Container.Priority,
69                                 }).Info("container on hold -- dropping from queue")
70                                 sch.queue.Forget(uuid)
71                         }
72                 case arvados.ContainerStateLocked:
73                         if running && !exited.IsZero() && qUpdated.After(exited) {
74                                 go sch.requeue(ent, "crunch-run exited")
75                         } else if running && exited.IsZero() && ent.Container.Priority == 0 {
76                                 go sch.kill(uuid, "priority=0")
77                         } else if !running && ent.Container.Priority == 0 {
78                                 go sch.requeue(ent, "priority=0")
79                         }
80                 default:
81                         sch.logger.WithFields(logrus.Fields{
82                                 "ContainerUUID": uuid,
83                                 "State":         ent.Container.State,
84                         }).Error("BUG: unexpected state")
85                 }
86         }
87         for uuid := range running {
88                 if _, known := qEntries[uuid]; !known {
89                         go sch.kill(uuid, "not in queue")
90                 }
91         }
92 }
93
94 func (sch *Scheduler) cancel(uuid string, reason string) {
95         if !sch.uuidLock(uuid, "cancel") {
96                 return
97         }
98         defer sch.uuidUnlock(uuid)
99         logger := sch.logger.WithField("ContainerUUID", uuid)
100         logger.Infof("cancelling container because %s", reason)
101         err := sch.queue.Cancel(uuid)
102         if err != nil {
103                 logger.WithError(err).Print("error cancelling container")
104         }
105 }
106
107 func (sch *Scheduler) kill(uuid string, reason string) {
108         sch.pool.KillContainer(uuid, reason)
109         sch.pool.ForgetContainer(uuid)
110 }
111
112 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
113         uuid := ent.Container.UUID
114         if !sch.uuidLock(uuid, "cancel") {
115                 return
116         }
117         defer sch.uuidUnlock(uuid)
118         logger := sch.logger.WithFields(logrus.Fields{
119                 "ContainerUUID": uuid,
120                 "State":         ent.Container.State,
121                 "Priority":      ent.Container.Priority,
122         })
123         logger.Infof("requeueing locked container because %s", reason)
124         err := sch.queue.Unlock(uuid)
125         if err != nil {
126                 logger.WithError(err).Error("error requeueing container")
127         }
128 }