1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
10 "git.arvados.org/arvados.git/lib/dispatchcloud/container"
11 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "github.com/sirupsen/logrus"
16 var reportedUnexpectedState = false
18 // sync resolves discrepancies between the queue and the pool:
20 // Lingering crunch-run processes for finalized and unlocked/requeued
21 // containers are killed.
23 // Locked containers whose crunch-run processes have exited are
26 // Running containers whose crunch-run processes have exited are
28 func (sch *Scheduler) sync() {
29 anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
30 running := sch.pool.Running()
31 qEntries, qUpdated := sch.queue.Entries()
32 for uuid, ent := range qEntries {
33 exited, running := running[uuid]
34 switch ent.Container.State {
35 case arvados.ContainerStateRunning:
37 if !anyUnknownWorkers {
38 go sch.cancel(uuid, "not running on any worker")
40 } else if !exited.IsZero() && qUpdated.After(exited) {
41 go sch.cancel(uuid, "state=Running after crunch-run exited")
42 } else if ent.Container.Priority == 0 {
43 go sch.kill(uuid, "priority=0")
45 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
47 // Kill crunch-run in case it's stuck;
48 // nothing it does now will matter
49 // anyway. If crunch-run has already
50 // exited and we just haven't found
51 // out about it yet, the only effect
52 // of kill() will be to make the
53 // worker available for the next
55 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
57 sch.logger.WithFields(logrus.Fields{
58 "ContainerUUID": uuid,
59 "State": ent.Container.State,
60 }).Info("container finished -- dropping from queue")
61 sch.queue.Forget(uuid)
63 case arvados.ContainerStateQueued:
65 // Can happen if a worker returns from
66 // a network outage and is still
67 // preparing to run a container that
68 // has already been unlocked/requeued.
69 go sch.kill(uuid, fmt.Sprintf("pool says running, but queue says state=%s", ent.Container.State))
70 } else if ent.Container.Priority == 0 {
71 sch.logger.WithFields(logrus.Fields{
72 "ContainerUUID": uuid,
73 "State": ent.Container.State,
74 "Priority": ent.Container.Priority,
75 }).Info("container on hold -- dropping from queue")
76 sch.queue.Forget(uuid)
78 case arvados.ContainerStateLocked:
79 if running && !exited.IsZero() && qUpdated.After(exited) {
80 go sch.requeue(ent, "crunch-run exited")
81 } else if running && exited.IsZero() && ent.Container.Priority == 0 {
82 go sch.kill(uuid, "priority=0")
83 } else if !running && ent.Container.Priority == 0 {
84 go sch.requeue(ent, "priority=0")
87 if !reportedUnexpectedState {
88 sch.logger.WithFields(logrus.Fields{
89 "ContainerUUID": uuid,
90 "State": ent.Container.State,
91 }).Error("BUG: unexpected state")
92 reportedUnexpectedState = true
96 for uuid := range running {
97 if _, known := qEntries[uuid]; !known {
98 go sch.kill(uuid, "not in queue")
103 func (sch *Scheduler) cancel(uuid string, reason string) {
104 if !sch.uuidLock(uuid, "cancel") {
107 defer sch.uuidUnlock(uuid)
108 logger := sch.logger.WithField("ContainerUUID", uuid)
109 logger.Infof("cancelling container because %s", reason)
110 err := sch.queue.Cancel(uuid)
112 logger.WithError(err).Print("error cancelling container")
116 func (sch *Scheduler) kill(uuid string, reason string) {
117 if !sch.uuidLock(uuid, "kill") {
120 defer sch.uuidUnlock(uuid)
121 sch.logger.WithFields(logrus.Fields{
122 "ContainerUUID": uuid,
125 sch.pool.KillContainer(uuid, reason)
126 sch.pool.ForgetContainer(uuid)
129 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
130 uuid := ent.Container.UUID
131 if !sch.uuidLock(uuid, "requeue") {
134 defer sch.uuidUnlock(uuid)
135 logger := sch.logger.WithFields(logrus.Fields{
136 "ContainerUUID": uuid,
137 "State": ent.Container.State,
138 "Priority": ent.Container.Priority,
140 logger.Infof("requeueing locked container because %s", reason)
141 err := sch.queue.Unlock(uuid)
143 logger.WithError(err).Error("error requeueing container")