14360: Fix tests.
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "time"
9
10         "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
11         "git.curoverse.com/arvados.git/sdk/go/arvados"
12         "github.com/Sirupsen/logrus"
13 )
14
15 // Sync resolves discrepancies between the queue and the pool:
16 //
17 // Lingering crunch-run processes for finalized and unlocked/requeued
18 // containers are killed.
19 //
20 // Locked containers whose crunch-run processes have exited are
21 // requeued.
22 //
23 // Running containers whose crunch-run processes have exited are
24 // cancelled.
25 //
26 // Sync must not be called concurrently with other calls to Map or
27 // Sync using the same queue or pool.
28 func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
29         running := pool.Running()
30         cancel := func(ent container.QueueEnt, reason string) {
31                 uuid := ent.Container.UUID
32                 logger := logger.WithField("ContainerUUID", uuid)
33                 logger.Infof("cancelling container because %s", reason)
34                 err := queue.Cancel(uuid)
35                 if err != nil {
36                         logger.WithError(err).Print("error cancelling container")
37                 }
38         }
39         kill := func(ent container.QueueEnt) {
40                 uuid := ent.Container.UUID
41                 logger := logger.WithField("ContainerUUID", uuid)
42                 logger.Debugf("killing crunch-run process because state=%q", ent.Container.State)
43                 pool.KillContainer(uuid)
44         }
45         qEntries, qUpdated := queue.Entries()
46         for uuid, ent := range qEntries {
47                 exited, running := running[uuid]
48                 switch ent.Container.State {
49                 case arvados.ContainerStateRunning:
50                         if !running {
51                                 cancel(ent, "not running on any worker")
52                         } else if !exited.IsZero() && qUpdated.After(exited) {
53                                 cancel(ent, "state=\"Running\" after crunch-run exited")
54                         }
55                 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
56                         if running {
57                                 kill(ent)
58                         } else {
59                                 logger.WithFields(logrus.Fields{
60                                         "ContainerUUID": uuid,
61                                         "State":         ent.Container.State,
62                                 }).Info("container finished")
63                                 queue.Forget(uuid)
64                         }
65                 case arvados.ContainerStateQueued:
66                         if running {
67                                 kill(ent)
68                         }
69                 case arvados.ContainerStateLocked:
70                         if running && !exited.IsZero() && qUpdated.After(exited) {
71                                 logger = logger.WithFields(logrus.Fields{
72                                         "ContainerUUID": uuid,
73                                         "Exited":        time.Since(exited).Seconds(),
74                                 })
75                                 logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
76                                 err := queue.Unlock(uuid)
77                                 if err != nil {
78                                         logger.WithError(err).Info("error requeueing container")
79                                 }
80                         }
81                 default:
82                         logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
83                 }
84         }
85 }