14807: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "fmt"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "github.com/sirupsen/logrus"
14 )
15
16 // sync resolves discrepancies between the queue and the pool:
17 //
18 // Lingering crunch-run processes for finalized and unlocked/requeued
19 // containers are killed.
20 //
21 // Locked containers whose crunch-run processes have exited are
22 // requeued.
23 //
24 // Running containers whose crunch-run processes have exited are
25 // cancelled.
26 func (sch *Scheduler) sync() {
27         running := sch.pool.Running()
28         cancel := func(ent container.QueueEnt, reason string) {
29                 uuid := ent.Container.UUID
30                 logger := sch.logger.WithField("ContainerUUID", uuid)
31                 logger.Infof("cancelling container because %s", reason)
32                 err := sch.queue.Cancel(uuid)
33                 if err != nil {
34                         logger.WithError(err).Print("error cancelling container")
35                 }
36         }
37         kill := func(ent container.QueueEnt, reason string) {
38                 uuid := ent.Container.UUID
39                 logger := sch.logger.WithField("ContainerUUID", uuid)
40                 logger.Debugf("killing crunch-run process because %s", reason)
41                 sch.pool.KillContainer(uuid)
42         }
43         qEntries, qUpdated := sch.queue.Entries()
44         for uuid, ent := range qEntries {
45                 exited, running := running[uuid]
46                 switch ent.Container.State {
47                 case arvados.ContainerStateRunning:
48                         if !running {
49                                 go cancel(ent, "not running on any worker")
50                         } else if !exited.IsZero() && qUpdated.After(exited) {
51                                 go cancel(ent, "state=\"Running\" after crunch-run exited")
52                         } else if ent.Container.Priority == 0 {
53                                 go kill(ent, "priority=0")
54                         }
55                 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
56                         if running {
57                                 // Kill crunch-run in case it's stuck;
58                                 // nothing it does now will matter
59                                 // anyway. If crunch-run has already
60                                 // exited and we just haven't found
61                                 // out about it yet, the only effect
62                                 // of kill() will be to make the
63                                 // worker available for the next
64                                 // container.
65                                 go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
66                         } else {
67                                 sch.logger.WithFields(logrus.Fields{
68                                         "ContainerUUID": uuid,
69                                         "State":         ent.Container.State,
70                                 }).Info("container finished")
71                                 sch.queue.Forget(uuid)
72                         }
73                 case arvados.ContainerStateQueued:
74                         if running {
75                                 // Can happen if a worker returns from
76                                 // a network outage and is still
77                                 // preparing to run a container that
78                                 // has already been unlocked/requeued.
79                                 go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
80                         }
81                 case arvados.ContainerStateLocked:
82                         if running && !exited.IsZero() && qUpdated.After(exited) {
83                                 logger := sch.logger.WithFields(logrus.Fields{
84                                         "ContainerUUID": uuid,
85                                         "State":         ent.Container.State,
86                                         "Priority":      ent.Container.Priority,
87                                         "Exited":        time.Since(exited).Seconds(),
88                                 })
89                                 logger.Info("requeueing locked container after crunch-run exited")
90                                 err := sch.queue.Unlock(uuid)
91                                 if err != nil {
92                                         logger.WithError(err).Error("error requeueing container")
93                                 }
94                         } else if running && exited.IsZero() && ent.Container.Priority == 0 {
95                                 go kill(ent, "priority=0")
96                         } else if !running && ent.Container.Priority == 0 {
97                                 logger := sch.logger.WithField("ContainerUUID", uuid)
98                                 logger.Info("unlocking container because priority=0")
99                                 err := sch.queue.Unlock(uuid)
100                                 if err != nil {
101                                         logger.WithError(err).Error("error requeueing container")
102                                 }
103                         }
104                 default:
105                         sch.logger.WithFields(logrus.Fields{
106                                 "ContainerUUID": uuid,
107                                 "State":         ent.Container.State,
108                         }).Error("BUG: unexpected state")
109                 }
110         }
111 }