Arvados-DCO-1.1-Signed-off-by: Dung Lam <dunglam@projectnelly.com>
[arvados.git] / lib / dispatchcloud / scheduler / sync.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package scheduler
6
7 import (
8         "fmt"
9
10         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
11         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "github.com/sirupsen/logrus"
14 )
15
16 var reportedUnexpectedState = false
17
18 // sync resolves discrepancies between the queue and the pool:
19 //
20 // Lingering crunch-run processes for finalized and unlocked/requeued
21 // containers are killed.
22 //
23 // Locked containers whose crunch-run processes have exited are
24 // requeued.
25 //
26 // Running containers whose crunch-run processes have exited are
27 // cancelled.
28 func (sch *Scheduler) sync() {
29         anyUnknownWorkers := sch.pool.CountWorkers()[worker.StateUnknown] > 0
30         running := sch.pool.Running()
31         qEntries, qUpdated := sch.queue.Entries()
32         for uuid, ent := range qEntries {
33                 exited, running := running[uuid]
34                 switch ent.Container.State {
35                 case arvados.ContainerStateRunning:
36                         if !running {
37                                 if !anyUnknownWorkers {
38                                         go sch.cancel(uuid, "not running on any worker")
39                                 }
40                         } else if !exited.IsZero() && qUpdated.After(exited) {
41                                 go sch.cancel(uuid, "state=Running after crunch-run exited")
42                         } else if ent.Container.Priority == 0 {
43                                 go sch.kill(uuid, "priority=0")
44                         }
45                 case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
46                         if running {
47                                 // Kill crunch-run in case it's stuck;
48                                 // nothing it does now will matter
49                                 // anyway. If crunch-run has already
50                                 // exited and we just haven't found
51                                 // out about it yet, the only effect
52                                 // of kill() will be to make the
53                                 // worker available for the next
54                                 // container.
55                                 go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
56                         } else {
57                                 sch.logger.WithFields(logrus.Fields{
58                                         "ContainerUUID": uuid,
59                                         "State":         ent.Container.State,
60                                 }).Info("container finished -- dropping from queue")
61                                 sch.queue.Forget(uuid)
62                         }
63                 case arvados.ContainerStateQueued:
64                         if running {
65                                 // Can happen if a worker returns from
66                                 // a network outage and is still
67                                 // preparing to run a container that
68                                 // has already been unlocked/requeued.
69                                 go sch.kill(uuid, fmt.Sprintf("pool says running, but queue says state=%s", ent.Container.State))
70                         } else if ent.Container.Priority == 0 {
71                                 sch.logger.WithFields(logrus.Fields{
72                                         "ContainerUUID": uuid,
73                                         "State":         ent.Container.State,
74                                         "Priority":      ent.Container.Priority,
75                                 }).Info("container on hold -- dropping from queue")
76                                 sch.queue.Forget(uuid)
77                         }
78                 case arvados.ContainerStateLocked:
79                         if running && !exited.IsZero() && qUpdated.After(exited) {
80                                 go sch.requeue(ent, "crunch-run exited")
81                         } else if running && exited.IsZero() && ent.Container.Priority == 0 {
82                                 go sch.kill(uuid, "priority=0")
83                         } else if !running && ent.Container.Priority == 0 {
84                                 go sch.requeue(ent, "priority=0")
85                         }
86                 default:
87                         if !reportedUnexpectedState {
88                                 sch.logger.WithFields(logrus.Fields{
89                                         "ContainerUUID": uuid,
90                                         "State":         ent.Container.State,
91                                 }).Error("BUG: unexpected state")
92                                 reportedUnexpectedState = true
93                         }
94                 }
95         }
96         for uuid := range running {
97                 if _, known := qEntries[uuid]; !known {
98                         go sch.kill(uuid, "not in queue")
99                 }
100         }
101 }
102
103 func (sch *Scheduler) cancel(uuid string, reason string) {
104         if !sch.uuidLock(uuid, "cancel") {
105                 return
106         }
107         defer sch.uuidUnlock(uuid)
108         logger := sch.logger.WithField("ContainerUUID", uuid)
109         logger.Infof("cancelling container because %s", reason)
110         err := sch.queue.Cancel(uuid)
111         if err != nil {
112                 logger.WithError(err).Print("error cancelling container")
113         }
114 }
115
116 func (sch *Scheduler) kill(uuid string, reason string) {
117         if !sch.uuidLock(uuid, "kill") {
118                 return
119         }
120         defer sch.uuidUnlock(uuid)
121         sch.logger.WithFields(logrus.Fields{
122                 "ContainerUUID": uuid,
123                 "reason":        reason,
124         }).Debug("kill")
125         sch.pool.KillContainer(uuid, reason)
126         sch.pool.ForgetContainer(uuid)
127 }
128
129 func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
130         uuid := ent.Container.UUID
131         if !sch.uuidLock(uuid, "requeue") {
132                 return
133         }
134         defer sch.uuidUnlock(uuid)
135         logger := sch.logger.WithFields(logrus.Fields{
136                 "ContainerUUID": uuid,
137                 "State":         ent.Container.State,
138                 "Priority":      ent.Container.Priority,
139         })
140         logger.Infof("requeueing locked container because %s", reason)
141         err := sch.queue.Unlock(uuid)
142         if err != nil {
143                 logger.WithError(err).Error("error requeueing container")
144         }
145 }