1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
10 "git.arvados.org/arvados.git/lib/dispatchcloud/container"
11 "git.arvados.org/arvados.git/sdk/go/arvados"
12 "github.com/sirupsen/logrus"
15 // sync resolves discrepancies between the queue and the pool:
17 // Lingering crunch-run processes for finalized and unlocked/requeued
18 // containers are killed.
20 // Locked containers whose crunch-run processes have exited are
23 // Running containers whose crunch-run processes have exited are
25 func (sch *Scheduler) sync() {
26 running := sch.pool.Running()
27 qEntries, qUpdated := sch.queue.Entries()
28 for uuid, ent := range qEntries {
29 exited, running := running[uuid]
30 switch ent.Container.State {
31 case arvados.ContainerStateRunning:
33 go sch.cancel(uuid, "not running on any worker")
34 } else if !exited.IsZero() && qUpdated.After(exited) {
35 go sch.cancel(uuid, "state=Running after crunch-run exited")
36 } else if ent.Container.Priority == 0 {
37 go sch.kill(uuid, "priority=0")
39 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
41 // Kill crunch-run in case it's stuck;
42 // nothing it does now will matter
43 // anyway. If crunch-run has already
44 // exited and we just haven't found
45 // out about it yet, the only effect
46 // of kill() will be to make the
47 // worker available for the next
49 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
51 sch.logger.WithFields(logrus.Fields{
52 "ContainerUUID": uuid,
53 "State": ent.Container.State,
54 }).Info("container finished -- dropping from queue")
55 sch.queue.Forget(uuid)
57 case arvados.ContainerStateQueued:
59 // Can happen if a worker returns from
60 // a network outage and is still
61 // preparing to run a container that
62 // has already been unlocked/requeued.
63 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
64 } else if ent.Container.Priority == 0 {
65 sch.logger.WithFields(logrus.Fields{
66 "ContainerUUID": uuid,
67 "State": ent.Container.State,
68 "Priority": ent.Container.Priority,
69 }).Info("container on hold -- dropping from queue")
70 sch.queue.Forget(uuid)
72 case arvados.ContainerStateLocked:
73 if running && !exited.IsZero() && qUpdated.After(exited) {
74 go sch.requeue(ent, "crunch-run exited")
75 } else if running && exited.IsZero() && ent.Container.Priority == 0 {
76 go sch.kill(uuid, "priority=0")
77 } else if !running && ent.Container.Priority == 0 {
78 go sch.requeue(ent, "priority=0")
81 sch.logger.WithFields(logrus.Fields{
82 "ContainerUUID": uuid,
83 "State": ent.Container.State,
84 }).Error("BUG: unexpected state")
87 for uuid := range running {
88 if _, known := qEntries[uuid]; !known {
89 go sch.kill(uuid, "not in queue")
94 func (sch *Scheduler) cancel(uuid string, reason string) {
95 if !sch.uuidLock(uuid, "cancel") {
98 defer sch.uuidUnlock(uuid)
99 logger := sch.logger.WithField("ContainerUUID", uuid)
100 logger.Infof("cancelling container because %s", reason)
101 err := sch.queue.Cancel(uuid)
103 logger.WithError(err).Print("error cancelling container")
107 func (sch *Scheduler) kill(uuid string, reason string) {
108 sch.pool.KillContainer(uuid, reason)
109 sch.pool.ForgetContainer(uuid)
112 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
113 uuid := ent.Container.UUID
114 if !sch.uuidLock(uuid, "cancel") {
117 defer sch.uuidUnlock(uuid)
118 logger := sch.logger.WithFields(logrus.Fields{
119 "ContainerUUID": uuid,
120 "State": ent.Container.State,
121 "Priority": ent.Container.Priority,
123 logger.Infof("requeueing locked container because %s", reason)
124 err := sch.queue.Unlock(uuid)
126 logger.WithError(err).Error("error requeueing container")