1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
10 "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
11 "git.curoverse.com/arvados.git/sdk/go/arvados"
12 "github.com/sirupsen/logrus"
15 // sync resolves discrepancies between the queue and the pool:
17 // Lingering crunch-run processes for finalized and unlocked/requeued
18 // containers are killed.
20 // Locked containers whose crunch-run processes have exited are
23 // Running containers whose crunch-run processes have exited are
25 func (sch *Scheduler) sync() {
26 running := sch.pool.Running()
27 qEntries, qUpdated := sch.queue.Entries()
28 for uuid, ent := range qEntries {
29 exited, running := running[uuid]
30 switch ent.Container.State {
31 case arvados.ContainerStateRunning:
33 go sch.cancel(ent, "not running on any worker")
34 } else if !exited.IsZero() && qUpdated.After(exited) {
35 go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
36 } else if ent.Container.Priority == 0 {
37 go sch.kill(ent, "priority=0")
39 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
41 // Kill crunch-run in case it's stuck;
42 // nothing it does now will matter
43 // anyway. If crunch-run has already
44 // exited and we just haven't found
45 // out about it yet, the only effect
46 // of kill() will be to make the
47 // worker available for the next
49 go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
51 sch.logger.WithFields(logrus.Fields{
52 "ContainerUUID": uuid,
53 "State": ent.Container.State,
54 }).Info("container finished")
55 sch.queue.Forget(uuid)
57 case arvados.ContainerStateQueued:
59 // Can happen if a worker returns from
60 // a network outage and is still
61 // preparing to run a container that
62 // has already been unlocked/requeued.
63 go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
65 case arvados.ContainerStateLocked:
66 if running && !exited.IsZero() && qUpdated.After(exited) {
67 go sch.requeue(ent, "crunch-run exited")
68 } else if running && exited.IsZero() && ent.Container.Priority == 0 {
69 go sch.kill(ent, "priority=0")
70 } else if !running && ent.Container.Priority == 0 {
71 go sch.requeue(ent, "priority=0")
74 sch.logger.WithFields(logrus.Fields{
75 "ContainerUUID": uuid,
76 "State": ent.Container.State,
77 }).Error("BUG: unexpected state")
82 func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
83 uuid := ent.Container.UUID
84 if !sch.uuidLock(uuid, "cancel") {
87 defer sch.uuidUnlock(uuid)
88 logger := sch.logger.WithField("ContainerUUID", uuid)
89 logger.Infof("cancelling container because %s", reason)
90 err := sch.queue.Cancel(uuid)
92 logger.WithError(err).Print("error cancelling container")
96 func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
97 uuid := ent.Container.UUID
98 logger := sch.logger.WithField("ContainerUUID", uuid)
99 logger.Debugf("killing crunch-run process because %s", reason)
100 sch.pool.KillContainer(uuid)
103 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
104 uuid := ent.Container.UUID
105 if !sch.uuidLock(uuid, "cancel") {
108 defer sch.uuidUnlock(uuid)
109 logger := sch.logger.WithFields(logrus.Fields{
110 "ContainerUUID": uuid,
111 "State": ent.Container.State,
112 "Priority": ent.Container.Priority,
114 logger.Infof("requeueing locked container because %s", reason)
115 err := sch.queue.Unlock(uuid)
117 logger.WithError(err).Error("error requeueing container")