14325: Rephrase confusing conditions and add comments.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/sirupsen/logrus"
17 )
18
19 const (
20         // TODO: configurable
21         maxPingFailTime = 10 * time.Minute
22 )
23
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
26 type State int
27
28 const (
29         StateUnknown  State = iota // might be running a container already
30         StateBooting               // instance is booting
31         StateIdle                  // instance booted, no containers are running
32         StateRunning               // instance is running one or more containers
33         StateShutdown              // worker has stopped monitoring the instance
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42 }
43
44 // String implements fmt.Stringer.
45 func (s State) String() string {
46         return stateString[s]
47 }
48
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52         return []byte(stateString[s]), nil
53 }
54
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
57
58 const (
59         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
60         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
61         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
62 )
63
64 var validIdleBehavior = map[IdleBehavior]bool{
65         IdleBehaviorRun:   true,
66         IdleBehaviorHold:  true,
67         IdleBehaviorDrain: true,
68 }
69
70 type worker struct {
71         logger   logrus.FieldLogger
72         executor Executor
73         wp       *Pool
74
75         mtx          sync.Locker // must be wp's Locker.
76         state        State
77         idleBehavior IdleBehavior
78         instance     cloud.Instance
79         instType     arvados.InstanceType
80         vcpus        int64
81         memory       int64
82         appeared     time.Time
83         probed       time.Time
84         updated      time.Time
85         busy         time.Time
86         destroyed    time.Time
87         lastUUID     string
88         running      map[string]struct{} // remember to update state idle<->running when this changes
89         starting     map[string]struct{} // remember to update state idle<->running when this changes
90         probing      chan struct{}
91 }
92
93 // caller must have lock.
94 func (wkr *worker) startContainer(ctr arvados.Container) {
95         logger := wkr.logger.WithFields(logrus.Fields{
96                 "ContainerUUID": ctr.UUID,
97                 "Priority":      ctr.Priority,
98         })
99         logger = logger.WithField("Instance", wkr.instance)
100         logger.Debug("starting container")
101         wkr.starting[ctr.UUID] = struct{}{}
102         wkr.state = StateRunning
103         go func() {
104                 env := map[string]string{
105                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
106                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
107                 }
108                 stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
109                 wkr.mtx.Lock()
110                 defer wkr.mtx.Unlock()
111                 now := time.Now()
112                 wkr.updated = now
113                 wkr.busy = now
114                 delete(wkr.starting, ctr.UUID)
115                 wkr.running[ctr.UUID] = struct{}{}
116                 wkr.lastUUID = ctr.UUID
117                 if err != nil {
118                         logger.WithField("stdout", string(stdout)).
119                                 WithField("stderr", string(stderr)).
120                                 WithError(err).
121                                 Error("error starting crunch-run process")
122                         // Leave uuid in wkr.running, though: it's
123                         // possible the error was just a communication
124                         // failure and the process was in fact
125                         // started.  Wait for next probe to find out.
126                         return
127                 }
128                 logger.Info("crunch-run process started")
129                 wkr.lastUUID = ctr.UUID
130         }()
131 }
132
133 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
134 // for the worker's curent state. If a previous probe is still
135 // running, it does nothing.
136 //
137 // It should be called in a new goroutine.
138 func (wkr *worker) ProbeAndUpdate() {
139         select {
140         case wkr.probing <- struct{}{}:
141                 wkr.probeAndUpdate()
142                 <-wkr.probing
143         default:
144                 wkr.logger.Debug("still waiting for last probe to finish")
145         }
146 }
147
148 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
149 // updates state accordingly.
150 //
151 // In StateUnknown: Call both probeBooted and probeRunning.
152 // In StateBooting: Call probeBooted; if successful, call probeRunning.
153 // In StateRunning: Call probeRunning.
154 // In StateIdle: Call probeRunning.
155 // In StateShutdown: Do nothing.
156 //
157 // If both probes succeed, wkr.state changes to
158 // StateIdle/StateRunning.
159 //
160 // If probeRunning succeeds, wkr.running is updated. (This means
161 // wkr.running might be non-empty even in StateUnknown, if the boot
162 // probe failed.)
163 //
164 // probeAndUpdate should be called in a new goroutine.
165 func (wkr *worker) probeAndUpdate() {
166         wkr.mtx.Lock()
167         updated := wkr.updated
168         initialState := wkr.state
169         wkr.mtx.Unlock()
170
171         var (
172                 booted   bool
173                 ctrUUIDs []string
174                 ok       bool
175                 stderr   []byte
176         )
177
178         switch initialState {
179         case StateShutdown:
180                 return
181         case StateIdle, StateRunning:
182                 booted = true
183         case StateUnknown, StateBooting:
184         default:
185                 panic(fmt.Sprintf("unknown state %s", initialState))
186         }
187
188         if !booted {
189                 booted, stderr = wkr.probeBooted()
190                 if !booted {
191                         // Pretend this probe succeeded if another
192                         // concurrent attempt succeeded.
193                         wkr.mtx.Lock()
194                         booted = wkr.state == StateRunning || wkr.state == StateIdle
195                         wkr.mtx.Unlock()
196                 } else {
197                         wkr.logger.Info("instance booted; will try probeRunning")
198                 }
199         }
200         if booted || wkr.state == StateUnknown {
201                 ctrUUIDs, ok, stderr = wkr.probeRunning()
202         }
203         logger := wkr.logger.WithField("stderr", string(stderr))
204         wkr.mtx.Lock()
205         defer wkr.mtx.Unlock()
206         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
207                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
208                         // Skip the logging noise if shutdown was
209                         // initiated during probe.
210                         return
211                 }
212                 dur := time.Since(wkr.probed)
213                 logger := logger.WithFields(logrus.Fields{
214                         "Duration": dur,
215                         "State":    wkr.state,
216                 })
217                 if !booted {
218                         // While we're polling the VM to see if it's
219                         // finished booting, failures are not
220                         // noteworthy, so we log at Debug level.
221                         logger.Debug("new instance not responding")
222                 } else {
223                         logger.Info("instance not responding")
224                 }
225                 wkr.shutdownIfBroken(dur)
226                 return
227         }
228
229         updateTime := time.Now()
230         wkr.probed = updateTime
231
232         if updated != wkr.updated {
233                 // Worker was updated after the probe began, so
234                 // wkr.running might have a container UUID that was
235                 // not yet running when ctrUUIDs was generated. Leave
236                 // wkr.running alone and wait for the next probe to
237                 // catch up on any changes.
238                 return
239         }
240
241         if len(ctrUUIDs) > 0 {
242                 wkr.busy = updateTime
243                 wkr.lastUUID = ctrUUIDs[0]
244         } else if len(wkr.running) > 0 {
245                 // Actual last-busy time was sometime between wkr.busy
246                 // and now. Now is the earliest opportunity to take
247                 // advantage of the non-busy state, though.
248                 wkr.busy = updateTime
249         }
250         changed := false
251
252         // Build a new "running" map. Set changed=true if it differs
253         // from the existing map (wkr.running) to ensure the scheduler
254         // gets notified below.
255         running := map[string]struct{}{}
256         for _, uuid := range ctrUUIDs {
257                 running[uuid] = struct{}{}
258                 if _, ok := wkr.running[uuid]; !ok {
259                         if _, ok := wkr.starting[uuid]; !ok {
260                                 // We didn't start it -- it must have
261                                 // been started by a previous
262                                 // dispatcher process.
263                                 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
264                         }
265                         changed = true
266                 }
267         }
268         for uuid := range wkr.running {
269                 if _, ok := running[uuid]; !ok {
270                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
271                         wkr.wp.notifyExited(uuid, updateTime)
272                         changed = true
273                 }
274         }
275
276         // Update state if this was the first successful boot-probe.
277         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
278                 // Note: this will change again below if
279                 // len(wkr.starting)+len(wkr.running) > 0.
280                 wkr.state = StateIdle
281                 changed = true
282         }
283
284         // If wkr.state and wkr.running aren't changing then there's
285         // no need to log anything, notify the scheduler, move state
286         // back and forth between idle/running, etc.
287         if !changed {
288                 return
289         }
290
291         // Log whenever a run-probe reveals crunch-run processes
292         // appearing/disappearing before boot-probe succeeds.
293         if wkr.state == StateUnknown && len(running) != len(wkr.running) {
294                 logger.WithFields(logrus.Fields{
295                         "RunningContainers": len(running),
296                         "State":             wkr.state,
297                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
298         }
299
300         wkr.running = running
301         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
302                 wkr.state = StateRunning
303         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
304                 wkr.state = StateIdle
305         }
306         wkr.updated = updateTime
307         if booted && (initialState == StateUnknown || initialState == StateBooting) {
308                 logger.WithFields(logrus.Fields{
309                         "RunningContainers": len(running),
310                         "State":             wkr.state,
311                 }).Info("probes succeeded, instance is in service")
312         }
313         go wkr.wp.notify()
314 }
315
316 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
317         cmd := "crunch-run --list"
318         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
319         if err != nil {
320                 wkr.logger.WithFields(logrus.Fields{
321                         "Command": cmd,
322                         "stdout":  string(stdout),
323                         "stderr":  string(stderr),
324                 }).WithError(err).Warn("probe failed")
325                 return nil, false, stderr
326         }
327         stdout = bytes.TrimRight(stdout, "\n")
328         if len(stdout) == 0 {
329                 return nil, true, stderr
330         }
331         return strings.Split(string(stdout), "\n"), true, stderr
332 }
333
334 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
335         cmd := wkr.wp.bootProbeCommand
336         if cmd == "" {
337                 cmd = "true"
338         }
339         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
340         logger := wkr.logger.WithFields(logrus.Fields{
341                 "Command": cmd,
342                 "stdout":  string(stdout),
343                 "stderr":  string(stderr),
344         })
345         if err != nil {
346                 logger.WithError(err).Debug("boot probe failed")
347                 return false, stderr
348         }
349         logger.Info("boot probe succeeded")
350         return true, stderr
351 }
352
353 // caller must have lock.
354 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
355         if wkr.idleBehavior == IdleBehaviorHold {
356                 // Never shut down.
357                 return
358         }
359         label, threshold := "", wkr.wp.timeoutProbe
360         if wkr.state == StateUnknown || wkr.state == StateBooting {
361                 label, threshold = "new ", wkr.wp.timeoutBooting
362         }
363         if dur < threshold {
364                 return
365         }
366         wkr.logger.WithFields(logrus.Fields{
367                 "Duration": dur,
368                 "Since":    wkr.probed,
369                 "State":    wkr.state,
370         }).Warnf("%sinstance unresponsive, shutting down", label)
371         wkr.shutdown()
372 }
373
374 // caller must have lock.
375 func (wkr *worker) shutdownIfIdle() bool {
376         if wkr.idleBehavior == IdleBehaviorHold {
377                 // Never shut down.
378                 return false
379         }
380         age := time.Since(wkr.busy)
381
382         old := age >= wkr.wp.timeoutIdle
383         draining := wkr.idleBehavior == IdleBehaviorDrain
384         shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
385                 (draining && wkr.state == StateBooting)
386         if !shouldShutdown {
387                 return false
388         }
389
390         wkr.logger.WithFields(logrus.Fields{
391                 "State":        wkr.state,
392                 "Age":          age,
393                 "IdleBehavior": wkr.idleBehavior,
394         }).Info("shutdown idle worker")
395         wkr.shutdown()
396         return true
397 }
398
399 // caller must have lock.
400 func (wkr *worker) shutdown() {
401         now := time.Now()
402         wkr.updated = now
403         wkr.destroyed = now
404         wkr.state = StateShutdown
405         go wkr.wp.notify()
406         go func() {
407                 err := wkr.instance.Destroy()
408                 if err != nil {
409                         wkr.logger.WithError(err).Warn("shutdown failed")
410                         return
411                 }
412         }()
413 }
414
415 // Save worker tags to cloud provider metadata, if they don't already
416 // match. Caller must have lock.
417 func (wkr *worker) saveTags() {
418         instance := wkr.instance
419         have := instance.Tags()
420         want := cloud.InstanceTags{
421                 tagKeyInstanceType: wkr.instType.Name,
422                 tagKeyIdleBehavior: string(wkr.idleBehavior),
423         }
424         go func() {
425                 for k, v := range want {
426                         if v == have[k] {
427                                 continue
428                         }
429                         err := instance.SetTags(want)
430                         if err != nil {
431                                 wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
432                         }
433                         break
434
435                 }
436         }()
437 }