16636: implement review comments.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
58 type BootOutcome string
59
60 const (
61         BootOutcomeFailed      BootOutcome = "failure"
62         BootOutcomeSucceeded   BootOutcome = "success"
63         BootOutcomeAborted     BootOutcome = "aborted"
64         BootOutcomeDisappeared BootOutcome = "disappeared"
65 )
66
67 var validBootOutcomes = map[BootOutcome]bool{
68         BootOutcomeFailed:      true,
69         BootOutcomeSucceeded:   true,
70         BootOutcomeAborted:     true,
71         BootOutcomeDisappeared: true,
72 }
73
74 // IdleBehavior indicates the behavior desired when a node becomes idle.
75 type IdleBehavior string
76
77 const (
78         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
79         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
80         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
81 )
82
83 var validIdleBehavior = map[IdleBehavior]bool{
84         IdleBehaviorRun:   true,
85         IdleBehaviorHold:  true,
86         IdleBehaviorDrain: true,
87 }
88
89 type worker struct {
90         logger   logrus.FieldLogger
91         executor Executor
92         wp       *Pool
93
94         mtx                 sync.Locker // must be wp's Locker.
95         state               State
96         idleBehavior        IdleBehavior
97         instance            cloud.Instance
98         instType            arvados.InstanceType
99         vcpus               int64
100         memory              int64
101         appeared            time.Time
102         probed              time.Time
103         updated             time.Time
104         busy                time.Time
105         destroyed           time.Time
106         lastUUID            string
107         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
108         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
109         probing             chan struct{}
110         bootOutcomeReported bool
111 }
112
113 func (wkr *worker) onUnkillable(uuid string) {
114         wkr.mtx.Lock()
115         defer wkr.mtx.Unlock()
116         logger := wkr.logger.WithField("ContainerUUID", uuid)
117         if wkr.idleBehavior == IdleBehaviorHold {
118                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
119                 return
120         }
121         logger.Warn("unkillable container, draining worker")
122         wkr.setIdleBehavior(IdleBehaviorDrain)
123 }
124
125 func (wkr *worker) onKilled(uuid string) {
126         wkr.mtx.Lock()
127         defer wkr.mtx.Unlock()
128         wkr.closeRunner(uuid)
129         go wkr.wp.notify()
130 }
131
132 // caller must have lock.
133 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
134         if wkr.bootOutcomeReported {
135                 return
136         }
137         if wkr.wp.mBootOutcomes != nil {
138                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
139         }
140         wkr.bootOutcomeReported = true
141 }
142
143 // caller must have lock.
144 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
145         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
146         wkr.idleBehavior = idleBehavior
147         wkr.saveTags()
148         wkr.shutdownIfIdle()
149 }
150
151 // caller must have lock.
152 func (wkr *worker) startContainer(ctr arvados.Container) {
153         logger := wkr.logger.WithFields(logrus.Fields{
154                 "ContainerUUID": ctr.UUID,
155                 "Priority":      ctr.Priority,
156         })
157         logger.Debug("starting container")
158         rr := newRemoteRunner(ctr.UUID, wkr)
159         wkr.starting[ctr.UUID] = rr
160         if wkr.state != StateRunning {
161                 wkr.state = StateRunning
162                 go wkr.wp.notify()
163         }
164         go func() {
165                 rr.Start()
166                 wkr.mtx.Lock()
167                 defer wkr.mtx.Unlock()
168                 now := time.Now()
169                 wkr.updated = now
170                 wkr.busy = now
171                 delete(wkr.starting, ctr.UUID)
172                 wkr.running[ctr.UUID] = rr
173                 wkr.lastUUID = ctr.UUID
174         }()
175 }
176
177 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
178 // for the worker's curent state. If a previous probe is still
179 // running, it does nothing.
180 //
181 // It should be called in a new goroutine.
182 func (wkr *worker) ProbeAndUpdate() {
183         select {
184         case wkr.probing <- struct{}{}:
185                 wkr.probeAndUpdate()
186                 <-wkr.probing
187         default:
188                 wkr.logger.Debug("still waiting for last probe to finish")
189         }
190 }
191
192 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
193 // updates state accordingly.
194 //
195 // In StateUnknown: Call both probeBooted and probeRunning.
196 // In StateBooting: Call probeBooted; if successful, call probeRunning.
197 // In StateRunning: Call probeRunning.
198 // In StateIdle: Call probeRunning.
199 // In StateShutdown: Do nothing.
200 //
201 // If both probes succeed, wkr.state changes to
202 // StateIdle/StateRunning.
203 //
204 // If probeRunning succeeds, wkr.running is updated. (This means
205 // wkr.running might be non-empty even in StateUnknown, if the boot
206 // probe failed.)
207 //
208 // probeAndUpdate should be called in a new goroutine.
209 func (wkr *worker) probeAndUpdate() {
210         wkr.mtx.Lock()
211         updated := wkr.updated
212         initialState := wkr.state
213         wkr.mtx.Unlock()
214
215         var (
216                 booted   bool
217                 ctrUUIDs []string
218                 ok       bool
219                 stderr   []byte // from probeBooted
220         )
221
222         switch initialState {
223         case StateShutdown:
224                 return
225         case StateIdle, StateRunning:
226                 booted = true
227         case StateUnknown, StateBooting:
228         default:
229                 panic(fmt.Sprintf("unknown state %s", initialState))
230         }
231
232         probeStart := time.Now()
233         logger := wkr.logger.WithField("ProbeStart", probeStart)
234
235         if !booted {
236                 booted, stderr = wkr.probeBooted()
237                 if !booted {
238                         // Pretend this probe succeeded if another
239                         // concurrent attempt succeeded.
240                         wkr.mtx.Lock()
241                         booted = wkr.state == StateRunning || wkr.state == StateIdle
242                         wkr.mtx.Unlock()
243                 }
244                 if booted {
245                         logger.Info("instance booted; will try probeRunning")
246                 }
247         }
248         reportedBroken := false
249         if booted || wkr.state == StateUnknown {
250                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
251         }
252         wkr.mtx.Lock()
253         defer wkr.mtx.Unlock()
254         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
255                 logger.Info("probe reported broken instance")
256                 wkr.reportBootOutcome(BootOutcomeFailed)
257                 wkr.setIdleBehavior(IdleBehaviorDrain)
258         }
259         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
260                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
261                         // Skip the logging noise if shutdown was
262                         // initiated during probe.
263                         return
264                 }
265                 // Using the start time of the probe as the timeout
266                 // threshold ensures we always initiate at least one
267                 // probe attempt after the boot/probe timeout expires
268                 // (otherwise, a slow probe failure could cause us to
269                 // shutdown an instance even though it did in fact
270                 // boot/recover before the timeout expired).
271                 dur := probeStart.Sub(wkr.probed)
272                 if wkr.shutdownIfBroken(dur) {
273                         // stderr from failed run-probes will have
274                         // been logged already, but boot-probe
275                         // failures are normal so they are logged only
276                         // at Debug level. This is our chance to log
277                         // some evidence about why the node never
278                         // booted, even in non-debug mode.
279                         if !booted {
280                                 wkr.reportBootOutcome(BootOutcomeFailed)
281                                 logger.WithFields(logrus.Fields{
282                                         "Duration": dur,
283                                         "stderr":   string(stderr),
284                                 }).Info("boot failed")
285                         }
286                 }
287                 return
288         }
289
290         updateTime := time.Now()
291         wkr.probed = updateTime
292
293         if updated != wkr.updated {
294                 // Worker was updated after the probe began, so
295                 // wkr.running might have a container UUID that was
296                 // not yet running when ctrUUIDs was generated. Leave
297                 // wkr.running alone and wait for the next probe to
298                 // catch up on any changes.
299                 return
300         }
301
302         if len(ctrUUIDs) > 0 {
303                 wkr.busy = updateTime
304                 wkr.lastUUID = ctrUUIDs[0]
305         } else if len(wkr.running) > 0 {
306                 // Actual last-busy time was sometime between wkr.busy
307                 // and now. Now is the earliest opportunity to take
308                 // advantage of the non-busy state, though.
309                 wkr.busy = updateTime
310         }
311
312         changed := wkr.updateRunning(ctrUUIDs)
313
314         // Update state if this was the first successful boot-probe.
315         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
316                 // Note: this will change again below if
317                 // len(wkr.starting)+len(wkr.running) > 0.
318                 wkr.state = StateIdle
319                 changed = true
320         }
321
322         // If wkr.state and wkr.running aren't changing then there's
323         // no need to log anything, notify the scheduler, move state
324         // back and forth between idle/running, etc.
325         if !changed {
326                 return
327         }
328
329         // Log whenever a run-probe reveals crunch-run processes
330         // appearing/disappearing before boot-probe succeeds.
331         if wkr.state == StateUnknown && changed {
332                 logger.WithFields(logrus.Fields{
333                         "RunningContainers": len(wkr.running),
334                         "State":             wkr.state,
335                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
336         }
337
338         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
339                 wkr.state = StateRunning
340         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
341                 wkr.state = StateIdle
342         }
343         wkr.updated = updateTime
344         if booted && (initialState == StateUnknown || initialState == StateBooting) {
345                 wkr.reportBootOutcome(BootOutcomeSucceeded)
346                 logger.WithFields(logrus.Fields{
347                         "RunningContainers": len(wkr.running),
348                         "State":             wkr.state,
349                 }).Info("probes succeeded, instance is in service")
350         }
351         go wkr.wp.notify()
352 }
353
354 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
355         cmd := wkr.wp.runnerCmd + " --list"
356         if u := wkr.instance.RemoteUser(); u != "root" {
357                 cmd = "sudo " + cmd
358         }
359         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
360         if err != nil {
361                 wkr.logger.WithFields(logrus.Fields{
362                         "Command": cmd,
363                         "stdout":  string(stdout),
364                         "stderr":  string(stderr),
365                 }).WithError(err).Warn("probe failed")
366                 return
367         }
368         ok = true
369         for _, s := range strings.Split(string(stdout), "\n") {
370                 if s == "broken" {
371                         reportsBroken = true
372                 } else if s != "" {
373                         running = append(running, s)
374                 }
375         }
376         return
377 }
378
379 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
380         cmd := wkr.wp.bootProbeCommand
381         if cmd == "" {
382                 cmd = "true"
383         }
384         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
385         logger := wkr.logger.WithFields(logrus.Fields{
386                 "Command": cmd,
387                 "stdout":  string(stdout),
388                 "stderr":  string(stderr),
389         })
390         if err != nil {
391                 logger.WithError(err).Debug("boot probe failed")
392                 return false, stderr
393         }
394         logger.Info("boot probe succeeded")
395         if err = wkr.wp.loadRunnerData(); err != nil {
396                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
397                 return false, stderr
398         } else if len(wkr.wp.runnerData) == 0 {
399                 // Assume crunch-run is already installed
400         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
401                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
402                 return false, stderr2
403         } else {
404                 stderr = append(stderr, stderr2...)
405         }
406         return true, stderr
407 }
408
409 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
410         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
411         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
412         logger := wkr.logger.WithFields(logrus.Fields{
413                 "hash": hash,
414                 "path": wkr.wp.runnerCmd,
415         })
416
417         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
418         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
419                 logger.Info("runner binary already exists on worker, with correct hash")
420                 return
421         }
422
423         // Note touch+chmod come before writing data, to avoid the
424         // possibility of md5 being correct while file mode is
425         // incorrect.
426         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
427         if wkr.instance.RemoteUser() != "root" {
428                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
429         }
430         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
431         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
432         return
433 }
434
435 // caller must have lock.
436 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
437         if wkr.idleBehavior == IdleBehaviorHold {
438                 // Never shut down.
439                 return false
440         }
441         label, threshold := "", wkr.wp.timeoutProbe
442         if wkr.state == StateUnknown || wkr.state == StateBooting {
443                 label, threshold = "new ", wkr.wp.timeoutBooting
444         }
445         if dur < threshold {
446                 return false
447         }
448         wkr.logger.WithFields(logrus.Fields{
449                 "Duration": dur,
450                 "Since":    wkr.probed,
451                 "State":    wkr.state,
452         }).Warnf("%sinstance unresponsive, shutting down", label)
453         wkr.shutdown()
454         return true
455 }
456
457 // Returns true if the instance is eligible for shutdown: either it's
458 // been idle too long, or idleBehavior=Drain and nothing is running.
459 //
460 // caller must have lock.
461 func (wkr *worker) eligibleForShutdown() bool {
462         if wkr.idleBehavior == IdleBehaviorHold {
463                 return false
464         }
465         draining := wkr.idleBehavior == IdleBehaviorDrain
466         switch wkr.state {
467         case StateBooting:
468                 return draining
469         case StateIdle:
470                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
471         case StateRunning:
472                 if !draining {
473                         return false
474                 }
475                 for _, rr := range wkr.running {
476                         if !rr.givenup {
477                                 return false
478                         }
479                 }
480                 for _, rr := range wkr.starting {
481                         if !rr.givenup {
482                                 return false
483                         }
484                 }
485                 // draining, and all remaining runners are just trying
486                 // to force-kill their crunch-run procs
487                 return true
488         default:
489                 return false
490         }
491 }
492
493 // caller must have lock.
494 func (wkr *worker) shutdownIfIdle() bool {
495         if !wkr.eligibleForShutdown() {
496                 return false
497         }
498         wkr.logger.WithFields(logrus.Fields{
499                 "State":        wkr.state,
500                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
501                 "IdleBehavior": wkr.idleBehavior,
502         }).Info("shutdown worker")
503         wkr.reportBootOutcome(BootOutcomeAborted)
504         wkr.shutdown()
505         return true
506 }
507
508 // caller must have lock.
509 func (wkr *worker) shutdown() {
510         now := time.Now()
511         wkr.updated = now
512         wkr.destroyed = now
513         wkr.state = StateShutdown
514         go wkr.wp.notify()
515         go func() {
516                 err := wkr.instance.Destroy()
517                 if err != nil {
518                         wkr.logger.WithError(err).Warn("shutdown failed")
519                         return
520                 }
521         }()
522 }
523
524 // Save worker tags to cloud provider metadata, if they don't already
525 // match. Caller must have lock.
526 func (wkr *worker) saveTags() {
527         instance := wkr.instance
528         tags := instance.Tags()
529         update := cloud.InstanceTags{
530                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
531                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
532         }
533         save := false
534         for k, v := range update {
535                 if tags[k] != v {
536                         tags[k] = v
537                         save = true
538                 }
539         }
540         if save {
541                 go func() {
542                         err := instance.SetTags(tags)
543                         if err != nil {
544                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
545                         }
546                 }()
547         }
548 }
549
550 func (wkr *worker) Close() {
551         // This might take time, so do it after unlocking mtx.
552         defer wkr.executor.Close()
553
554         wkr.mtx.Lock()
555         defer wkr.mtx.Unlock()
556         for uuid, rr := range wkr.running {
557                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
558                 rr.Close()
559         }
560         for uuid, rr := range wkr.starting {
561                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
562                 rr.Close()
563         }
564 }
565
566 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
567 // probe. Returns true if anything was added or removed.
568 //
569 // Caller must have lock.
570 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
571         alive := map[string]bool{}
572         for _, uuid := range ctrUUIDs {
573                 alive[uuid] = true
574                 if _, ok := wkr.running[uuid]; ok {
575                         // unchanged
576                 } else if rr, ok := wkr.starting[uuid]; ok {
577                         wkr.running[uuid] = rr
578                         delete(wkr.starting, uuid)
579                         changed = true
580                 } else {
581                         // We didn't start it -- it must have been
582                         // started by a previous dispatcher process.
583                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
584                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
585                         changed = true
586                 }
587         }
588         for uuid := range wkr.running {
589                 if !alive[uuid] {
590                         wkr.closeRunner(uuid)
591                         changed = true
592                 }
593         }
594         return
595 }
596
597 // caller must have lock.
598 func (wkr *worker) closeRunner(uuid string) {
599         rr := wkr.running[uuid]
600         if rr == nil {
601                 return
602         }
603         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
604         delete(wkr.running, uuid)
605         rr.Close()
606
607         now := time.Now()
608         wkr.updated = now
609         wkr.wp.exited[uuid] = now
610         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
611                 wkr.state = StateIdle
612         }
613 }