17995: Fix merge error.
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "fmt"
9
10         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
11         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "github.com/sirupsen/logrus"
14 )
15
16 // sync resolves discrepancies between the queue and the pool:
17 //
18 // Lingering crunch-run processes for finalized and unlocked/requeued
19 // containers are killed.
20 //
21 // Locked containers whose crunch-run processes have exited are
22 // requeued.
23 //
24 // Running containers whose crunch-run processes have exited are
25 // cancelled.
26 func (sch *Scheduler) sync() {
27         anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
28         running := sch.pool.Running()
29         qEntries, qUpdated := sch.queue.Entries()
30         for uuid, ent := range qEntries {
31                 exited, running := running[uuid]
32                 switch ent.Container.State {
33                 case arvados.ContainerStateRunning:
34                         if !running {
35                                 if !anyUnknownWorkers {
36                                         go sch.cancel(uuid, "not running on any worker")
37                                 }
38                         } else if !exited.IsZero() && qUpdated.After(exited) {
39                                 go sch.cancel(uuid, "state=Running after crunch-run exited")
40                         } else if ent.Container.Priority == 0 {
41                                 go sch.kill(uuid, "priority=0")
42                         }
43                 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
44                         if running {
45                                 // Kill crunch-run in case it's stuck;
46                                 // nothing it does now will matter
47                                 // anyway. If crunch-run has already
48                                 // exited and we just haven't found
49                                 // out about it yet, the only effect
50                                 // of kill() will be to make the
51                                 // worker available for the next
52                                 // container.
53                                 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
54                         } else {
55                                 sch.logger.WithFields(logrus.Fields{
56                                         "ContainerUUID": uuid,
57                                         "State":         ent.Container.State,
58                                 }).Info("container finished -- dropping from queue")
59                                 sch.queue.Forget(uuid)
60                         }
61                 case arvados.ContainerStateQueued:
62                         if running {
63                                 // Can happen if a worker returns from
64                                 // a network outage and is still
65                                 // preparing to run a container that
66                                 // has already been unlocked/requeued.
67                                 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
68                         } else if ent.Container.Priority == 0 {
69                                 sch.logger.WithFields(logrus.Fields{
70                                         "ContainerUUID": uuid,
71                                         "State":         ent.Container.State,
72                                         "Priority":      ent.Container.Priority,
73                                 }).Info("container on hold -- dropping from queue")
74                                 sch.queue.Forget(uuid)
75                         }
76                 case arvados.ContainerStateLocked:
77                         if running && !exited.IsZero() && qUpdated.After(exited) {
78                                 go sch.requeue(ent, "crunch-run exited")
79                         } else if running && exited.IsZero() && ent.Container.Priority == 0 {
80                                 go sch.kill(uuid, "priority=0")
81                         } else if !running && ent.Container.Priority == 0 {
82                                 go sch.requeue(ent, "priority=0")
83                         }
84                 default:
85                         sch.logger.WithFields(logrus.Fields{
86                                 "ContainerUUID": uuid,
87                                 "State":         ent.Container.State,
88                         }).Error("BUG: unexpected state")
89                 }
90         }
91         for uuid := range running {
92                 if _, known := qEntries[uuid]; !known {
93                         go sch.kill(uuid, "not in queue")
94                 }
95         }
96 }
97
98 func (sch *Scheduler) cancel(uuid string, reason string) {
99         if !sch.uuidLock(uuid, "cancel") {
100                 return
101         }
102         defer sch.uuidUnlock(uuid)
103         logger := sch.logger.WithField("ContainerUUID", uuid)
104         logger.Infof("cancelling container because %s", reason)
105         err := sch.queue.Cancel(uuid)
106         if err != nil {
107                 logger.WithError(err).Print("error cancelling container")
108         }
109 }
110
111 func (sch *Scheduler) kill(uuid string, reason string) {
112         if !sch.uuidLock(uuid, "kill") {
113                 return
114         }
115         defer sch.uuidUnlock(uuid)
116         sch.pool.KillContainer(uuid, reason)
117         sch.pool.ForgetContainer(uuid)
118 }
119
120 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
121         uuid := ent.Container.UUID
122         if !sch.uuidLock(uuid, "requeue") {
123                 return
124         }
125         defer sch.uuidUnlock(uuid)
126         logger := sch.logger.WithFields(logrus.Fields{
127                 "ContainerUUID": uuid,
128                 "State":         ent.Container.State,
129                 "Priority":      ent.Container.Priority,
130         })
131         logger.Infof("requeueing locked container because %s", reason)
132         err := sch.queue.Unlock(uuid)
133         if err != nil {
134                 logger.WithError(err).Error("error requeueing container")
135         }
136 }