16636: Merge branch 'master' into 16636-add-time-to-ssh-metric
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
58 type BootOutcome string
59
60 const (
61         BootOutcomeFailed      BootOutcome = "failure"
62         BootOutcomeSucceeded   BootOutcome = "success"
63         BootOutcomeAborted     BootOutcome = "aborted"
64         BootOutcomeDisappeared BootOutcome = "disappeared"
65 )
66
67 var validBootOutcomes = map[BootOutcome]bool{
68         BootOutcomeFailed:      true,
69         BootOutcomeSucceeded:   true,
70         BootOutcomeAborted:     true,
71         BootOutcomeDisappeared: true,
72 }
73
74 // IdleBehavior indicates the behavior desired when a node becomes idle.
75 type IdleBehavior string
76
77 const (
78         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
79         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
80         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
81 )
82
83 var validIdleBehavior = map[IdleBehavior]bool{
84         IdleBehaviorRun:   true,
85         IdleBehaviorHold:  true,
86         IdleBehaviorDrain: true,
87 }
88
89 type worker struct {
90         logger   logrus.FieldLogger
91         executor Executor
92         wp       *Pool
93
94         mtx                 sync.Locker // must be wp's Locker.
95         state               State
96         idleBehavior        IdleBehavior
97         instance            cloud.Instance
98         instType            arvados.InstanceType
99         vcpus               int64
100         memory              int64
101         appeared            time.Time
102         probed              time.Time
103         updated             time.Time
104         busy                time.Time
105         destroyed           time.Time
106         firstSSHConnection  time.Time
107         lastUUID            string
108         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
109         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
110         probing             chan struct{}
111         bootOutcomeReported bool
112         timeToReadyReported bool
113 }
114
115 func (wkr *worker) onUnkillable(uuid string) {
116         wkr.mtx.Lock()
117         defer wkr.mtx.Unlock()
118         logger := wkr.logger.WithField("ContainerUUID", uuid)
119         if wkr.idleBehavior == IdleBehaviorHold {
120                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
121                 return
122         }
123         logger.Warn("unkillable container, draining worker")
124         wkr.setIdleBehavior(IdleBehaviorDrain)
125 }
126
127 func (wkr *worker) onKilled(uuid string) {
128         wkr.mtx.Lock()
129         defer wkr.mtx.Unlock()
130         wkr.closeRunner(uuid)
131         go wkr.wp.notify()
132 }
133
134 // caller must have lock.
135 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
136         if wkr.bootOutcomeReported {
137                 return
138         }
139         if wkr.wp.mBootOutcomes != nil {
140                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
141         }
142         wkr.bootOutcomeReported = true
143 }
144
145 // caller must have lock.
146 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
147         if wkr.timeToReadyReported {
148                 return
149         }
150         if wkr.wp.mTimeToSSH != nil {
151                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
152         }
153         wkr.timeToReadyReported = true
154 }
155
156 // caller must have lock.
157 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
158         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
159         wkr.idleBehavior = idleBehavior
160         wkr.saveTags()
161         wkr.shutdownIfIdle()
162 }
163
164 // caller must have lock.
165 func (wkr *worker) startContainer(ctr arvados.Container) {
166         logger := wkr.logger.WithFields(logrus.Fields{
167                 "ContainerUUID": ctr.UUID,
168                 "Priority":      ctr.Priority,
169         })
170         logger.Debug("starting container")
171         rr := newRemoteRunner(ctr.UUID, wkr)
172         wkr.starting[ctr.UUID] = rr
173         if wkr.state != StateRunning {
174                 wkr.state = StateRunning
175                 go wkr.wp.notify()
176         }
177         go func() {
178                 rr.Start()
179                 wkr.mtx.Lock()
180                 defer wkr.mtx.Unlock()
181                 now := time.Now()
182                 wkr.updated = now
183                 wkr.busy = now
184                 delete(wkr.starting, ctr.UUID)
185                 wkr.running[ctr.UUID] = rr
186                 wkr.lastUUID = ctr.UUID
187         }()
188 }
189
190 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
191 // for the worker's curent state. If a previous probe is still
192 // running, it does nothing.
193 //
194 // It should be called in a new goroutine.
195 func (wkr *worker) ProbeAndUpdate() {
196         select {
197         case wkr.probing <- struct{}{}:
198                 wkr.probeAndUpdate()
199                 <-wkr.probing
200         default:
201                 wkr.logger.Debug("still waiting for last probe to finish")
202         }
203 }
204
205 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
206 // updates state accordingly.
207 //
208 // In StateUnknown: Call both probeBooted and probeRunning.
209 // In StateBooting: Call probeBooted; if successful, call probeRunning.
210 // In StateRunning: Call probeRunning.
211 // In StateIdle: Call probeRunning.
212 // In StateShutdown: Do nothing.
213 //
214 // If both probes succeed, wkr.state changes to
215 // StateIdle/StateRunning.
216 //
217 // If probeRunning succeeds, wkr.running is updated. (This means
218 // wkr.running might be non-empty even in StateUnknown, if the boot
219 // probe failed.)
220 //
221 // probeAndUpdate should be called in a new goroutine.
222 func (wkr *worker) probeAndUpdate() {
223         wkr.mtx.Lock()
224         updated := wkr.updated
225         initialState := wkr.state
226         wkr.mtx.Unlock()
227
228         var (
229                 booted   bool
230                 ctrUUIDs []string
231                 ok       bool
232                 stderr   []byte // from probeBooted
233         )
234
235         switch initialState {
236         case StateShutdown:
237                 return
238         case StateIdle, StateRunning:
239                 booted = true
240         case StateUnknown, StateBooting:
241         default:
242                 panic(fmt.Sprintf("unknown state %s", initialState))
243         }
244
245         probeStart := time.Now()
246         logger := wkr.logger.WithField("ProbeStart", probeStart)
247
248         if !booted {
249                 booted, stderr = wkr.probeBooted()
250                 if !booted {
251                         // Pretend this probe succeeded if another
252                         // concurrent attempt succeeded.
253                         wkr.mtx.Lock()
254                         booted = wkr.state == StateRunning || wkr.state == StateIdle
255                         wkr.mtx.Unlock()
256                 }
257                 if booted {
258                         logger.Info("instance booted; will try probeRunning")
259                 }
260         }
261         reportedBroken := false
262         if booted || wkr.state == StateUnknown {
263                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
264         }
265         wkr.mtx.Lock()
266         defer wkr.mtx.Unlock()
267         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
268                 logger.Info("probe reported broken instance")
269                 wkr.reportBootOutcome(BootOutcomeFailed)
270                 wkr.setIdleBehavior(IdleBehaviorDrain)
271         }
272         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
273                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
274                         // Skip the logging noise if shutdown was
275                         // initiated during probe.
276                         return
277                 }
278                 // Using the start time of the probe as the timeout
279                 // threshold ensures we always initiate at least one
280                 // probe attempt after the boot/probe timeout expires
281                 // (otherwise, a slow probe failure could cause us to
282                 // shutdown an instance even though it did in fact
283                 // boot/recover before the timeout expired).
284                 dur := probeStart.Sub(wkr.probed)
285                 if wkr.shutdownIfBroken(dur) {
286                         // stderr from failed run-probes will have
287                         // been logged already, but boot-probe
288                         // failures are normal so they are logged only
289                         // at Debug level. This is our chance to log
290                         // some evidence about why the node never
291                         // booted, even in non-debug mode.
292                         if !booted {
293                                 wkr.reportBootOutcome(BootOutcomeFailed)
294                                 logger.WithFields(logrus.Fields{
295                                         "Duration": dur,
296                                         "stderr":   string(stderr),
297                                 }).Info("boot failed")
298                         }
299                 }
300                 return
301         }
302
303         updateTime := time.Now()
304         wkr.probed = updateTime
305
306         if updated != wkr.updated {
307                 // Worker was updated after the probe began, so
308                 // wkr.running might have a container UUID that was
309                 // not yet running when ctrUUIDs was generated. Leave
310                 // wkr.running alone and wait for the next probe to
311                 // catch up on any changes.
312                 return
313         }
314
315         if len(ctrUUIDs) > 0 {
316                 wkr.busy = updateTime
317                 wkr.lastUUID = ctrUUIDs[0]
318         } else if len(wkr.running) > 0 {
319                 // Actual last-busy time was sometime between wkr.busy
320                 // and now. Now is the earliest opportunity to take
321                 // advantage of the non-busy state, though.
322                 wkr.busy = updateTime
323         }
324
325         changed := wkr.updateRunning(ctrUUIDs)
326
327         // Update state if this was the first successful boot-probe.
328         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
329                 if wkr.state == StateBooting {
330                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
331                 }
332                 // Note: this will change again below if
333                 // len(wkr.starting)+len(wkr.running) > 0.
334                 wkr.state = StateIdle
335                 changed = true
336         }
337
338         // If wkr.state and wkr.running aren't changing then there's
339         // no need to log anything, notify the scheduler, move state
340         // back and forth between idle/running, etc.
341         if !changed {
342                 return
343         }
344
345         // Log whenever a run-probe reveals crunch-run processes
346         // appearing/disappearing before boot-probe succeeds.
347         if wkr.state == StateUnknown && changed {
348                 logger.WithFields(logrus.Fields{
349                         "RunningContainers": len(wkr.running),
350                         "State":             wkr.state,
351                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
352         }
353
354         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
355                 wkr.state = StateRunning
356         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
357                 wkr.state = StateIdle
358         }
359         wkr.updated = updateTime
360         if booted && (initialState == StateUnknown || initialState == StateBooting) {
361                 wkr.reportBootOutcome(BootOutcomeSucceeded)
362                 logger.WithFields(logrus.Fields{
363                         "RunningContainers": len(wkr.running),
364                         "State":             wkr.state,
365                 }).Info("probes succeeded, instance is in service")
366         }
367         go wkr.wp.notify()
368 }
369
370 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
371         cmd := wkr.wp.runnerCmd + " --list"
372         if u := wkr.instance.RemoteUser(); u != "root" {
373                 cmd = "sudo " + cmd
374         }
375         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
376         if err != nil {
377                 wkr.logger.WithFields(logrus.Fields{
378                         "Command": cmd,
379                         "stdout":  string(stdout),
380                         "stderr":  string(stderr),
381                 }).WithError(err).Warn("probe failed")
382                 return
383         }
384         ok = true
385         for _, s := range strings.Split(string(stdout), "\n") {
386                 if s == "broken" {
387                         reportsBroken = true
388                 } else if s != "" {
389                         running = append(running, s)
390                 }
391         }
392         return
393 }
394
395 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
396         cmd := wkr.wp.bootProbeCommand
397         if cmd == "" {
398                 cmd = "true"
399         }
400         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
401         logger := wkr.logger.WithFields(logrus.Fields{
402                 "Command": cmd,
403                 "stdout":  string(stdout),
404                 "stderr":  string(stderr),
405         })
406         if err != nil {
407                 logger.WithError(err).Debug("boot probe failed")
408                 return false, stderr
409         }
410         logger.Info("boot probe succeeded")
411         if err = wkr.wp.loadRunnerData(); err != nil {
412                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
413                 return false, stderr
414         } else if len(wkr.wp.runnerData) == 0 {
415                 // Assume crunch-run is already installed
416         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
417                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
418                 return false, stderr2
419         } else {
420                 stderr = append(stderr, stderr2...)
421         }
422         return true, stderr
423 }
424
425 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
426         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
427         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
428         logger := wkr.logger.WithFields(logrus.Fields{
429                 "hash": hash,
430                 "path": wkr.wp.runnerCmd,
431         })
432
433         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
434         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
435                 logger.Info("runner binary already exists on worker, with correct hash")
436                 return
437         }
438
439         // Note touch+chmod come before writing data, to avoid the
440         // possibility of md5 being correct while file mode is
441         // incorrect.
442         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
443         if wkr.instance.RemoteUser() != "root" {
444                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
445         }
446         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
447         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
448         return
449 }
450
451 // caller must have lock.
452 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
453         if wkr.idleBehavior == IdleBehaviorHold {
454                 // Never shut down.
455                 return false
456         }
457         label, threshold := "", wkr.wp.timeoutProbe
458         if wkr.state == StateUnknown || wkr.state == StateBooting {
459                 label, threshold = "new ", wkr.wp.timeoutBooting
460         }
461         if dur < threshold {
462                 return false
463         }
464         wkr.logger.WithFields(logrus.Fields{
465                 "Duration": dur,
466                 "Since":    wkr.probed,
467                 "State":    wkr.state,
468         }).Warnf("%sinstance unresponsive, shutting down", label)
469         wkr.shutdown()
470         return true
471 }
472
473 // Returns true if the instance is eligible for shutdown: either it's
474 // been idle too long, or idleBehavior=Drain and nothing is running.
475 //
476 // caller must have lock.
477 func (wkr *worker) eligibleForShutdown() bool {
478         if wkr.idleBehavior == IdleBehaviorHold {
479                 return false
480         }
481         draining := wkr.idleBehavior == IdleBehaviorDrain
482         switch wkr.state {
483         case StateBooting:
484                 return draining
485         case StateIdle:
486                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
487         case StateRunning:
488                 if !draining {
489                         return false
490                 }
491                 for _, rr := range wkr.running {
492                         if !rr.givenup {
493                                 return false
494                         }
495                 }
496                 for _, rr := range wkr.starting {
497                         if !rr.givenup {
498                                 return false
499                         }
500                 }
501                 // draining, and all remaining runners are just trying
502                 // to force-kill their crunch-run procs
503                 return true
504         default:
505                 return false
506         }
507 }
508
509 // caller must have lock.
510 func (wkr *worker) shutdownIfIdle() bool {
511         if !wkr.eligibleForShutdown() {
512                 return false
513         }
514         wkr.logger.WithFields(logrus.Fields{
515                 "State":        wkr.state,
516                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
517                 "IdleBehavior": wkr.idleBehavior,
518         }).Info("shutdown worker")
519         wkr.reportBootOutcome(BootOutcomeAborted)
520         wkr.shutdown()
521         return true
522 }
523
524 // caller must have lock.
525 func (wkr *worker) shutdown() {
526         now := time.Now()
527         wkr.updated = now
528         wkr.destroyed = now
529         wkr.state = StateShutdown
530         go wkr.wp.notify()
531         go func() {
532                 err := wkr.instance.Destroy()
533                 if err != nil {
534                         wkr.logger.WithError(err).Warn("shutdown failed")
535                         return
536                 }
537         }()
538 }
539
540 // Save worker tags to cloud provider metadata, if they don't already
541 // match. Caller must have lock.
542 func (wkr *worker) saveTags() {
543         instance := wkr.instance
544         tags := instance.Tags()
545         update := cloud.InstanceTags{
546                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
547                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
548         }
549         save := false
550         for k, v := range update {
551                 if tags[k] != v {
552                         tags[k] = v
553                         save = true
554                 }
555         }
556         if save {
557                 go func() {
558                         err := instance.SetTags(tags)
559                         if err != nil {
560                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
561                         }
562                 }()
563         }
564 }
565
566 func (wkr *worker) Close() {
567         // This might take time, so do it after unlocking mtx.
568         defer wkr.executor.Close()
569
570         wkr.mtx.Lock()
571         defer wkr.mtx.Unlock()
572         for uuid, rr := range wkr.running {
573                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
574                 rr.Close()
575         }
576         for uuid, rr := range wkr.starting {
577                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
578                 rr.Close()
579         }
580 }
581
582 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
583 // probe. Returns true if anything was added or removed.
584 //
585 // Caller must have lock.
586 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
587         alive := map[string]bool{}
588         for _, uuid := range ctrUUIDs {
589                 alive[uuid] = true
590                 if _, ok := wkr.running[uuid]; ok {
591                         // unchanged
592                 } else if rr, ok := wkr.starting[uuid]; ok {
593                         wkr.running[uuid] = rr
594                         delete(wkr.starting, uuid)
595                         changed = true
596                 } else {
597                         // We didn't start it -- it must have been
598                         // started by a previous dispatcher process.
599                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
600                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
601                         changed = true
602                 }
603         }
604         for uuid := range wkr.running {
605                 if !alive[uuid] {
606                         wkr.closeRunner(uuid)
607                         changed = true
608                 }
609         }
610         return
611 }
612
613 // caller must have lock.
614 func (wkr *worker) closeRunner(uuid string) {
615         rr := wkr.running[uuid]
616         if rr == nil {
617                 return
618         }
619         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
620         delete(wkr.running, uuid)
621         rr.Close()
622
623         now := time.Now()
624         wkr.updated = now
625         wkr.wp.exited[uuid] = now
626         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
627                 wkr.state = StateIdle
628         }
629 }