1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
11 "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "github.com/Sirupsen/logrus"
16 // sync resolves discrepancies between the queue and the pool:
18 // Lingering crunch-run processes for finalized and unlocked/requeued
19 // containers are killed.
21 // Locked containers whose crunch-run processes have exited are
24 // Running containers whose crunch-run processes have exited are
26 func (sch *Scheduler) sync() {
27 running := sch.pool.Running()
28 cancel := func(ent container.QueueEnt, reason string) {
29 uuid := ent.Container.UUID
30 logger := sch.logger.WithField("ContainerUUID", uuid)
31 logger.Infof("cancelling container because %s", reason)
32 err := sch.queue.Cancel(uuid)
34 logger.WithError(err).Print("error cancelling container")
37 kill := func(ent container.QueueEnt, reason string) {
38 uuid := ent.Container.UUID
39 logger := sch.logger.WithField("ContainerUUID", uuid)
40 logger.Debugf("killing crunch-run process because %s", reason)
41 sch.pool.KillContainer(uuid)
43 qEntries, qUpdated := sch.queue.Entries()
44 for uuid, ent := range qEntries {
45 exited, running := running[uuid]
46 switch ent.Container.State {
47 case arvados.ContainerStateRunning:
49 go cancel(ent, "not running on any worker")
50 } else if !exited.IsZero() && qUpdated.After(exited) {
51 go cancel(ent, "state=\"Running\" after crunch-run exited")
52 } else if ent.Container.Priority == 0 {
53 go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
55 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
57 // Kill crunch-run in case it's stuck;
58 // nothing it does now will matter
59 // anyway. If crunch-run has already
60 // exited and we just haven't found
61 // out about it yet, the only effect
62 // of kill() will be to make the
63 // worker available for the next
65 go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
67 sch.logger.WithFields(logrus.Fields{
68 "ContainerUUID": uuid,
69 "State": ent.Container.State,
70 }).Info("container finished")
71 sch.queue.Forget(uuid)
73 case arvados.ContainerStateQueued:
75 // Can happen if a worker returns from
76 // a network outage and is still
77 // preparing to run a container that
78 // has already been unlocked/requeued.
79 go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
81 case arvados.ContainerStateLocked:
82 if running && !exited.IsZero() && qUpdated.After(exited) {
83 logger := sch.logger.WithFields(logrus.Fields{
84 "ContainerUUID": uuid,
85 "Exited": time.Since(exited).Seconds(),
87 logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
88 err := sch.queue.Unlock(uuid)
90 logger.WithError(err).Info("error requeueing container")
94 sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)