1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
10 "git.arvados.org/arvados.git/lib/dispatchcloud/container"
11 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
12 "git.arvados.org/arvados.git/sdk/go/arvados"
13 "github.com/sirupsen/logrus"
16 // sync resolves discrepancies between the queue and the pool:
18 // Lingering crunch-run processes for finalized and unlocked/requeued
19 // containers are killed.
21 // Locked containers whose crunch-run processes have exited are
24 // Running containers whose crunch-run processes have exited are
26 func (sch *Scheduler) sync() {
27 anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
28 running := sch.pool.Running()
29 qEntries, qUpdated := sch.queue.Entries()
30 for uuid, ent := range qEntries {
31 exited, running := running[uuid]
32 switch ent.Container.State {
33 case arvados.ContainerStateRunning:
35 if !anyUnknownWorkers {
36 go sch.cancel(uuid, "not running on any worker")
38 } else if !exited.IsZero() && qUpdated.After(exited) {
39 go sch.cancel(uuid, "state=Running after crunch-run exited")
40 } else if ent.Container.Priority == 0 {
41 go sch.kill(uuid, "priority=0")
43 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
45 // Kill crunch-run in case it's stuck;
46 // nothing it does now will matter
47 // anyway. If crunch-run has already
48 // exited and we just haven't found
49 // out about it yet, the only effect
50 // of kill() will be to make the
51 // worker available for the next
53 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
55 sch.logger.WithFields(logrus.Fields{
56 "ContainerUUID": uuid,
57 "State": ent.Container.State,
58 }).Info("container finished -- dropping from queue")
59 sch.queue.Forget(uuid)
61 case arvados.ContainerStateQueued:
63 // Can happen if a worker returns from
64 // a network outage and is still
65 // preparing to run a container that
66 // has already been unlocked/requeued.
67 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
68 } else if ent.Container.Priority == 0 {
69 sch.logger.WithFields(logrus.Fields{
70 "ContainerUUID": uuid,
71 "State": ent.Container.State,
72 "Priority": ent.Container.Priority,
73 }).Info("container on hold -- dropping from queue")
74 sch.queue.Forget(uuid)
76 case arvados.ContainerStateLocked:
77 if running && !exited.IsZero() && qUpdated.After(exited) {
78 go sch.requeue(ent, "crunch-run exited")
79 } else if running && exited.IsZero() && ent.Container.Priority == 0 {
80 go sch.kill(uuid, "priority=0")
81 } else if !running && ent.Container.Priority == 0 {
82 go sch.requeue(ent, "priority=0")
85 sch.logger.WithFields(logrus.Fields{
86 "ContainerUUID": uuid,
87 "State": ent.Container.State,
88 }).Error("BUG: unexpected state")
91 for uuid := range running {
92 if _, known := qEntries[uuid]; !known {
93 go sch.kill(uuid, "not in queue")
98 func (sch *Scheduler) cancel(uuid string, reason string) {
99 if !sch.uuidLock(uuid, "cancel") {
102 defer sch.uuidUnlock(uuid)
103 logger := sch.logger.WithField("ContainerUUID", uuid)
104 logger.Infof("cancelling container because %s", reason)
105 err := sch.queue.Cancel(uuid)
107 logger.WithError(err).Print("error cancelling container")
111 func (sch *Scheduler) kill(uuid string, reason string) {
112 if !sch.uuidLock(uuid, "kill") {
115 defer sch.uuidUnlock(uuid)
116 sch.pool.KillContainer(uuid, reason)
117 sch.pool.ForgetContainer(uuid)
120 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
121 uuid := ent.Container.UUID
122 if !sch.uuidLock(uuid, "requeue") {
125 defer sch.uuidUnlock(uuid)
126 logger := sch.logger.WithFields(logrus.Fields{
127 "ContainerUUID": uuid,
128 "State": ent.Container.State,
129 "Priority": ent.Container.Priority,
131 logger.Infof("requeueing locked container because %s", reason)
132 err := sch.queue.Unlock(uuid)
134 logger.WithError(err).Error("error requeueing container")