14324: Azure driver for crunch-dispatch-cloud
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "context"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17 )
18
19 // State indicates whether a worker is available to do work, and (if
20 // not) whether/when it is expected to become ready.
21 type State int
22
23 const (
24         StateUnknown  State = iota // might be running a container already
25         StateBooting               // instance is booting
26         StateIdle                  // instance booted, no containers are running
27         StateRunning               // instance is running one or more containers
28         StateShutdown              // worker has stopped monitoring the instance
29         StateHold                  // running, but not available to run new containers
30 )
31
32 const (
33         // TODO: configurable
34         maxPingFailTime = 10 * time.Minute
35 )
36
37 var stateString = map[State]string{
38         StateUnknown:  "unknown",
39         StateBooting:  "booting",
40         StateIdle:     "idle",
41         StateRunning:  "running",
42         StateShutdown: "shutdown",
43         StateHold:     "hold",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 type worker struct {
58         logger   logrus.FieldLogger
59         executor Executor
60         wp       *Pool
61
62         mtx       sync.Locker // must be wp's Locker.
63         state     State
64         instance  cloud.Instance
65         instType  arvados.InstanceType
66         vcpus     int64
67         memory    int64
68         appeared  time.Time
69         probed    time.Time
70         updated   time.Time
71         busy      time.Time
72         destroyed time.Time
73         lastUUID  string
74         running   map[string]struct{} // remember to update state idle<->running when this changes
75         starting  map[string]struct{} // remember to update state idle<->running when this changes
76         probing   chan struct{}
77 }
78
79 // caller must have lock.
80 func (wkr *worker) startContainer(ctr arvados.Container) {
81         logger := wkr.logger.WithFields(logrus.Fields{
82                 "ContainerUUID": ctr.UUID,
83                 "Priority":      ctr.Priority,
84         })
85         logger = logger.WithField("Instance", wkr.instance)
86         logger.Debug("starting container")
87         wkr.starting[ctr.UUID] = struct{}{}
88         wkr.state = StateRunning
89         go func() {
90                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
91                 wkr.mtx.Lock()
92                 defer wkr.mtx.Unlock()
93                 now := time.Now()
94                 wkr.updated = now
95                 wkr.busy = now
96                 delete(wkr.starting, ctr.UUID)
97                 wkr.running[ctr.UUID] = struct{}{}
98                 wkr.lastUUID = ctr.UUID
99                 if err != nil {
100                         logger.WithField("stdout", string(stdout)).
101                                 WithField("stderr", string(stderr)).
102                                 WithError(err).
103                                 Error("error starting crunch-run process")
104                         // Leave uuid in wkr.running, though: it's
105                         // possible the error was just a communication
106                         // failure and the process was in fact
107                         // started.  Wait for next probe to find out.
108                         return
109                 }
110                 logger.Info("crunch-run process started")
111                 wkr.lastUUID = ctr.UUID
112         }()
113 }
114
115 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
116 // for the worker's curent state. If a previous probe is still
117 // running, it does nothing.
118 //
119 // It should be called in a new goroutine.
120 func (wkr *worker) ProbeAndUpdate() {
121         select {
122         case wkr.probing <- struct{}{}:
123                 wkr.probeAndUpdate()
124                 <-wkr.probing
125         default:
126                 wkr.logger.Debug("still waiting for last probe to finish")
127         }
128 }
129
130 // should be called in a new goroutine
131 func (wkr *worker) probeAndUpdate() {
132         wkr.mtx.Lock()
133         updated := wkr.updated
134         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
135         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
136         wkr.mtx.Unlock()
137         if !needProbeBooted && !needProbeRunning {
138                 return
139         }
140
141         var (
142                 ctrUUIDs []string
143                 ok       bool
144                 stderr   []byte
145         )
146         if needProbeBooted {
147                 ok, stderr = wkr.probeBooted()
148                 wkr.mtx.Lock()
149                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
150                         wkr.logger.Info("instance booted; will try probeRunning")
151                         needProbeRunning = true
152                 }
153                 wkr.mtx.Unlock()
154         }
155         if needProbeRunning {
156                 ctrUUIDs, ok, stderr = wkr.probeRunning()
157         }
158         logger := wkr.logger.WithField("stderr", string(stderr))
159         wkr.mtx.Lock()
160         defer wkr.mtx.Unlock()
161         if !ok {
162                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
163                         // Skip the logging noise if shutdown was
164                         // initiated during probe.
165                         return
166                 }
167                 dur := time.Since(wkr.probed)
168                 logger := logger.WithFields(logrus.Fields{
169                         "Duration": dur,
170                         "State":    wkr.state,
171                 })
172                 if wkr.state == StateBooting && !needProbeRunning {
173                         // If we know the instance has never passed a
174                         // boot probe, it's not noteworthy that it
175                         // hasn't passed this probe.
176                         logger.Debug("new instance not responding")
177                 } else {
178                         logger.Info("instance not responding")
179                 }
180                 wkr.shutdownIfBroken(dur)
181                 return
182         }
183
184         updateTime := time.Now()
185         wkr.probed = updateTime
186
187         if updated != wkr.updated {
188                 // Worker was updated after the probe began, so
189                 // wkr.running might have a container UUID that was
190                 // not yet running when ctrUUIDs was generated. Leave
191                 // wkr.running alone and wait for the next probe to
192                 // catch up on any changes.
193                 return
194         }
195
196         if len(ctrUUIDs) > 0 {
197                 wkr.busy = updateTime
198                 wkr.lastUUID = ctrUUIDs[0]
199         } else if len(wkr.running) > 0 {
200                 // Actual last-busy time was sometime between wkr.busy
201                 // and now. Now is the earliest opportunity to take
202                 // advantage of the non-busy state, though.
203                 wkr.busy = updateTime
204         }
205         running := map[string]struct{}{}
206         changed := false
207         for _, uuid := range ctrUUIDs {
208                 running[uuid] = struct{}{}
209                 if _, ok := wkr.running[uuid]; !ok {
210                         changed = true
211                 }
212         }
213         for uuid := range wkr.running {
214                 if _, ok := running[uuid]; !ok {
215                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
216                         wkr.wp.notifyExited(uuid, updateTime)
217                         changed = true
218                 }
219         }
220         if wkr.state == StateUnknown || wkr.state == StateBooting {
221                 wkr.state = StateIdle
222                 changed = true
223         }
224         if changed {
225                 wkr.running = running
226                 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
227                         wkr.state = StateRunning
228                 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
229                         wkr.state = StateIdle
230                 }
231                 wkr.updated = updateTime
232                 go wkr.wp.notify()
233         }
234 }
235
236 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
237         cmd := "crunch-run --list"
238         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
239         if err != nil {
240                 wkr.logger.WithFields(logrus.Fields{
241                         "Command": cmd,
242                         "stdout":  string(stdout),
243                         "stderr":  string(stderr),
244                 }).WithError(err).Warn("probe failed")
245                 return nil, false, stderr
246         }
247         stdout = bytes.TrimRight(stdout, "\n")
248         if len(stdout) == 0 {
249                 return nil, true, stderr
250         }
251         return strings.Split(string(stdout), "\n"), true, stderr
252 }
253
254 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
255         cmd := wkr.wp.bootProbeCommand
256         if cmd == "" {
257                 cmd = "true"
258         }
259         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
260         logger := wkr.logger.WithFields(logrus.Fields{
261                 "Command": cmd,
262                 "stdout":  string(stdout),
263                 "stderr":  string(stderr),
264         })
265         if err != nil {
266                 logger.WithError(err).Debug("boot probe failed")
267                 return false, stderr
268         }
269         logger.Info("boot probe succeeded")
270         return true, stderr
271 }
272
273 // caller must have lock.
274 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
275         if wkr.state == StateHold {
276                 return
277         }
278         label, threshold := "", wkr.wp.timeoutProbe
279         if wkr.state == StateBooting {
280                 label, threshold = "new ", wkr.wp.timeoutBooting
281         }
282         if dur < threshold {
283                 return
284         }
285         wkr.logger.WithFields(logrus.Fields{
286                 "Duration": dur,
287                 "Since":    wkr.probed,
288                 "State":    wkr.state,
289         }).Warnf("%sinstance unresponsive, shutting down", label)
290         wkr.shutdown()
291 }
292
293 // caller must have lock.
294 func (wkr *worker) shutdownIfIdle() bool {
295         if wkr.state != StateIdle {
296                 return false
297         }
298         age := time.Since(wkr.busy)
299         if age < wkr.wp.timeoutIdle {
300                 return false
301         }
302         wkr.logger.WithField("Age", age).Info("shutdown idle worker")
303         wkr.shutdown()
304         return true
305 }
306
307 // caller must have lock
308 func (wkr *worker) shutdown() {
309         now := time.Now()
310         wkr.updated = now
311         wkr.destroyed = now
312         wkr.state = StateShutdown
313         go wkr.wp.notify()
314         go func() {
315                 err := wkr.instance.Destroy(context.TODO())
316                 if err != nil {
317                         wkr.logger.WithError(err).Warn("shutdown failed")
318                         return
319                 }
320         }()
321 }