1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
10 "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
11 "git.curoverse.com/arvados.git/sdk/go/arvados"
12 "github.com/sirupsen/logrus"
15 // sync resolves discrepancies between the queue and the pool:
17 // Lingering crunch-run processes for finalized and unlocked/requeued
18 // containers are killed.
20 // Locked containers whose crunch-run processes have exited are
23 // Running containers whose crunch-run processes have exited are
25 func (sch *Scheduler) sync() {
26 running := sch.pool.Running()
27 qEntries, qUpdated := sch.queue.Entries()
28 for uuid, ent := range qEntries {
29 exited, running := running[uuid]
30 switch ent.Container.State {
31 case arvados.ContainerStateRunning:
33 go sch.cancel(uuid, "not running on any worker")
34 } else if !exited.IsZero() && qUpdated.After(exited) {
35 go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
36 } else if ent.Container.Priority == 0 {
37 go sch.kill(uuid, "priority=0")
39 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
41 // Kill crunch-run in case it's stuck;
42 // nothing it does now will matter
43 // anyway. If crunch-run has already
44 // exited and we just haven't found
45 // out about it yet, the only effect
46 // of kill() will be to make the
47 // worker available for the next
49 go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
51 sch.logger.WithFields(logrus.Fields{
52 "ContainerUUID": uuid,
53 "State": ent.Container.State,
54 }).Info("container finished")
55 sch.queue.Forget(uuid)
57 case arvados.ContainerStateQueued:
59 // Can happen if a worker returns from
60 // a network outage and is still
61 // preparing to run a container that
62 // has already been unlocked/requeued.
63 go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
65 case arvados.ContainerStateLocked:
66 if running && !exited.IsZero() && qUpdated.After(exited) {
67 go sch.requeue(ent, "crunch-run exited")
68 } else if running && exited.IsZero() && ent.Container.Priority == 0 {
69 go sch.kill(uuid, "priority=0")
70 } else if !running && ent.Container.Priority == 0 {
71 go sch.requeue(ent, "priority=0")
74 sch.logger.WithFields(logrus.Fields{
75 "ContainerUUID": uuid,
76 "State": ent.Container.State,
77 }).Error("BUG: unexpected state")
80 for uuid := range running {
81 if _, known := qEntries[uuid]; !known {
82 go sch.kill(uuid, "not in queue")
87 func (sch *Scheduler) cancel(uuid string, reason string) {
88 if !sch.uuidLock(uuid, "cancel") {
91 defer sch.uuidUnlock(uuid)
92 logger := sch.logger.WithField("ContainerUUID", uuid)
93 logger.Infof("cancelling container because %s", reason)
94 err := sch.queue.Cancel(uuid)
96 logger.WithError(err).Print("error cancelling container")
100 func (sch *Scheduler) kill(uuid string, reason string) {
101 logger := sch.logger.WithField("ContainerUUID", uuid)
102 logger.Debugf("killing crunch-run process because %s", reason)
103 sch.pool.KillContainer(uuid)
106 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
107 uuid := ent.Container.UUID
108 if !sch.uuidLock(uuid, "cancel") {
111 defer sch.uuidUnlock(uuid)
112 logger := sch.logger.WithFields(logrus.Fields{
113 "ContainerUUID": uuid,
114 "State": ent.Container.State,
115 "Priority": ent.Container.Priority,
117 logger.Infof("requeueing locked container because %s", reason)
118 err := sch.queue.Unlock(uuid)
120 logger.WithError(err).Error("error requeueing container")