1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
10 "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
11 "git.curoverse.com/arvados.git/sdk/go/arvados"
12 "github.com/Sirupsen/logrus"
15 // Sync resolves discrepancies between the queue and the pool:
17 // Lingering crunch-run processes for finalized and unlocked/requeued
18 // containers are killed.
20 // Locked containers whose crunch-run processes have exited are
23 // Running containers whose crunch-run processes have exited are
26 // Sync must not be called concurrently with other calls to Map or
27 // Sync using the same queue or pool.
28 func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
29 running := pool.Running()
30 cancel := func(ent container.QueueEnt, reason string) {
31 uuid := ent.Container.UUID
32 logger := logger.WithField("ContainerUUID", uuid)
33 logger.Infof("cancelling container because %s", reason)
34 err := queue.Cancel(uuid)
36 logger.WithError(err).Print("error cancelling container")
39 kill := func(ent container.QueueEnt) {
40 uuid := ent.Container.UUID
41 logger := logger.WithField("ContainerUUID", uuid)
42 logger.Debugf("killing crunch-run process because state=%q", ent.Container.State)
43 pool.KillContainer(uuid)
45 qEntries, qUpdated := queue.Entries()
46 for uuid, ent := range qEntries {
47 exited, running := running[uuid]
48 switch ent.Container.State {
49 case arvados.ContainerStateRunning:
51 cancel(ent, "not running on any worker")
52 } else if !exited.IsZero() && qUpdated.After(exited) {
53 cancel(ent, "state=\"Running\" after crunch-run exited")
55 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
59 logger.WithFields(logrus.Fields{
60 "ContainerUUID": uuid,
61 "State": ent.Container.State,
62 }).Info("container finished")
65 case arvados.ContainerStateQueued:
69 case arvados.ContainerStateLocked:
70 if running && !exited.IsZero() && qUpdated.After(exited) {
71 logger = logger.WithFields(logrus.Fields{
72 "ContainerUUID": uuid,
73 "Exited": time.Since(exited).Seconds(),
75 logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
76 err := queue.Unlock(uuid)
78 logger.WithError(err).Info("error requeueing container")
82 logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)