Merge branch '16834-stale-run-lock'
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
58 type BootOutcome string
59
60 const (
61         BootOutcomeFailed      BootOutcome = "failure"
62         BootOutcomeSucceeded   BootOutcome = "success"
63         BootOutcomeAborted     BootOutcome = "aborted"
64         BootOutcomeDisappeared BootOutcome = "disappeared"
65 )
66
67 var validBootOutcomes = map[BootOutcome]bool{
68         BootOutcomeFailed:      true,
69         BootOutcomeSucceeded:   true,
70         BootOutcomeAborted:     true,
71         BootOutcomeDisappeared: true,
72 }
73
74 // IdleBehavior indicates the behavior desired when a node becomes idle.
75 type IdleBehavior string
76
77 const (
78         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
79         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
80         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
81 )
82
83 var validIdleBehavior = map[IdleBehavior]bool{
84         IdleBehaviorRun:   true,
85         IdleBehaviorHold:  true,
86         IdleBehaviorDrain: true,
87 }
88
89 type worker struct {
90         logger   logrus.FieldLogger
91         executor Executor
92         wp       *Pool
93
94         mtx                 sync.Locker // must be wp's Locker.
95         state               State
96         idleBehavior        IdleBehavior
97         instance            cloud.Instance
98         instType            arvados.InstanceType
99         vcpus               int64
100         memory              int64
101         appeared            time.Time
102         probed              time.Time
103         updated             time.Time
104         busy                time.Time
105         destroyed           time.Time
106         firstSSHConnection  time.Time
107         lastUUID            string
108         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
109         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
110         probing             chan struct{}
111         bootOutcomeReported bool
112         timeToReadyReported bool
113         staleRunLockSince   time.Time
114 }
115
116 func (wkr *worker) onUnkillable(uuid string) {
117         wkr.mtx.Lock()
118         defer wkr.mtx.Unlock()
119         logger := wkr.logger.WithField("ContainerUUID", uuid)
120         if wkr.idleBehavior == IdleBehaviorHold {
121                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
122                 return
123         }
124         logger.Warn("unkillable container, draining worker")
125         wkr.setIdleBehavior(IdleBehaviorDrain)
126 }
127
128 func (wkr *worker) onKilled(uuid string) {
129         wkr.mtx.Lock()
130         defer wkr.mtx.Unlock()
131         wkr.closeRunner(uuid)
132         go wkr.wp.notify()
133 }
134
135 // caller must have lock.
136 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
137         if wkr.bootOutcomeReported {
138                 return
139         }
140         if wkr.wp.mBootOutcomes != nil {
141                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
142         }
143         wkr.bootOutcomeReported = true
144 }
145
146 // caller must have lock.
147 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
148         if wkr.timeToReadyReported {
149                 return
150         }
151         if wkr.wp.mTimeToSSH != nil {
152                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
153         }
154         wkr.timeToReadyReported = true
155 }
156
157 // caller must have lock.
158 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
159         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
160         wkr.idleBehavior = idleBehavior
161         wkr.saveTags()
162         wkr.shutdownIfIdle()
163 }
164
165 // caller must have lock.
166 func (wkr *worker) startContainer(ctr arvados.Container) {
167         logger := wkr.logger.WithFields(logrus.Fields{
168                 "ContainerUUID": ctr.UUID,
169                 "Priority":      ctr.Priority,
170         })
171         logger.Debug("starting container")
172         rr := newRemoteRunner(ctr.UUID, wkr)
173         wkr.starting[ctr.UUID] = rr
174         if wkr.state != StateRunning {
175                 wkr.state = StateRunning
176                 go wkr.wp.notify()
177         }
178         go func() {
179                 rr.Start()
180                 wkr.mtx.Lock()
181                 defer wkr.mtx.Unlock()
182                 now := time.Now()
183                 wkr.updated = now
184                 wkr.busy = now
185                 delete(wkr.starting, ctr.UUID)
186                 wkr.running[ctr.UUID] = rr
187                 wkr.lastUUID = ctr.UUID
188         }()
189 }
190
191 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
192 // for the worker's curent state. If a previous probe is still
193 // running, it does nothing.
194 //
195 // It should be called in a new goroutine.
196 func (wkr *worker) ProbeAndUpdate() {
197         select {
198         case wkr.probing <- struct{}{}:
199                 wkr.probeAndUpdate()
200                 <-wkr.probing
201         default:
202                 wkr.logger.Debug("still waiting for last probe to finish")
203         }
204 }
205
206 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
207 // updates state accordingly.
208 //
209 // In StateUnknown: Call both probeBooted and probeRunning.
210 // In StateBooting: Call probeBooted; if successful, call probeRunning.
211 // In StateRunning: Call probeRunning.
212 // In StateIdle: Call probeRunning.
213 // In StateShutdown: Do nothing.
214 //
215 // If both probes succeed, wkr.state changes to
216 // StateIdle/StateRunning.
217 //
218 // If probeRunning succeeds, wkr.running is updated. (This means
219 // wkr.running might be non-empty even in StateUnknown, if the boot
220 // probe failed.)
221 //
222 // probeAndUpdate should be called in a new goroutine.
223 func (wkr *worker) probeAndUpdate() {
224         wkr.mtx.Lock()
225         updated := wkr.updated
226         initialState := wkr.state
227         wkr.mtx.Unlock()
228
229         var (
230                 booted   bool
231                 ctrUUIDs []string
232                 ok       bool
233                 stderr   []byte // from probeBooted
234         )
235
236         switch initialState {
237         case StateShutdown:
238                 return
239         case StateIdle, StateRunning:
240                 booted = true
241         case StateUnknown, StateBooting:
242         default:
243                 panic(fmt.Sprintf("unknown state %s", initialState))
244         }
245
246         probeStart := time.Now()
247         logger := wkr.logger.WithField("ProbeStart", probeStart)
248
249         if !booted {
250                 booted, stderr = wkr.probeBooted()
251                 if !booted {
252                         // Pretend this probe succeeded if another
253                         // concurrent attempt succeeded.
254                         wkr.mtx.Lock()
255                         booted = wkr.state == StateRunning || wkr.state == StateIdle
256                         wkr.mtx.Unlock()
257                 }
258                 if booted {
259                         logger.Info("instance booted; will try probeRunning")
260                 }
261         }
262         reportedBroken := false
263         if booted || wkr.state == StateUnknown {
264                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
265         }
266         wkr.mtx.Lock()
267         defer wkr.mtx.Unlock()
268         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
269                 logger.Info("probe reported broken instance")
270                 wkr.reportBootOutcome(BootOutcomeFailed)
271                 wkr.setIdleBehavior(IdleBehaviorDrain)
272         }
273         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
274                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
275                         // Skip the logging noise if shutdown was
276                         // initiated during probe.
277                         return
278                 }
279                 // Using the start time of the probe as the timeout
280                 // threshold ensures we always initiate at least one
281                 // probe attempt after the boot/probe timeout expires
282                 // (otherwise, a slow probe failure could cause us to
283                 // shutdown an instance even though it did in fact
284                 // boot/recover before the timeout expired).
285                 dur := probeStart.Sub(wkr.probed)
286                 if wkr.shutdownIfBroken(dur) {
287                         // stderr from failed run-probes will have
288                         // been logged already, but boot-probe
289                         // failures are normal so they are logged only
290                         // at Debug level. This is our chance to log
291                         // some evidence about why the node never
292                         // booted, even in non-debug mode.
293                         if !booted {
294                                 wkr.reportBootOutcome(BootOutcomeFailed)
295                                 logger.WithFields(logrus.Fields{
296                                         "Duration": dur,
297                                         "stderr":   string(stderr),
298                                 }).Info("boot failed")
299                         }
300                 }
301                 return
302         }
303
304         updateTime := time.Now()
305         wkr.probed = updateTime
306
307         if updated != wkr.updated {
308                 // Worker was updated after the probe began, so
309                 // wkr.running might have a container UUID that was
310                 // not yet running when ctrUUIDs was generated. Leave
311                 // wkr.running alone and wait for the next probe to
312                 // catch up on any changes.
313                 return
314         }
315
316         if len(ctrUUIDs) > 0 {
317                 wkr.busy = updateTime
318                 wkr.lastUUID = ctrUUIDs[0]
319         } else if len(wkr.running) > 0 {
320                 // Actual last-busy time was sometime between wkr.busy
321                 // and now. Now is the earliest opportunity to take
322                 // advantage of the non-busy state, though.
323                 wkr.busy = updateTime
324         }
325
326         changed := wkr.updateRunning(ctrUUIDs)
327
328         // Update state if this was the first successful boot-probe.
329         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
330                 if wkr.state == StateBooting {
331                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
332                 }
333                 // Note: this will change again below if
334                 // len(wkr.starting)+len(wkr.running) > 0.
335                 wkr.state = StateIdle
336                 changed = true
337         }
338
339         // If wkr.state and wkr.running aren't changing then there's
340         // no need to log anything, notify the scheduler, move state
341         // back and forth between idle/running, etc.
342         if !changed {
343                 return
344         }
345
346         // Log whenever a run-probe reveals crunch-run processes
347         // appearing/disappearing before boot-probe succeeds.
348         if wkr.state == StateUnknown && changed {
349                 logger.WithFields(logrus.Fields{
350                         "RunningContainers": len(wkr.running),
351                         "State":             wkr.state,
352                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
353         }
354
355         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
356                 wkr.state = StateRunning
357         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
358                 wkr.state = StateIdle
359         }
360         wkr.updated = updateTime
361         if booted && (initialState == StateUnknown || initialState == StateBooting) {
362                 wkr.reportBootOutcome(BootOutcomeSucceeded)
363                 logger.WithFields(logrus.Fields{
364                         "RunningContainers": len(wkr.running),
365                         "State":             wkr.state,
366                 }).Info("probes succeeded, instance is in service")
367         }
368         go wkr.wp.notify()
369 }
370
371 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
372         cmd := wkr.wp.runnerCmd + " --list"
373         if u := wkr.instance.RemoteUser(); u != "root" {
374                 cmd = "sudo " + cmd
375         }
376         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
377         if err != nil {
378                 wkr.logger.WithFields(logrus.Fields{
379                         "Command": cmd,
380                         "stdout":  string(stdout),
381                         "stderr":  string(stderr),
382                 }).WithError(err).Warn("probe failed")
383                 return
384         }
385         ok = true
386
387         staleRunLock := false
388         for _, s := range strings.Split(string(stdout), "\n") {
389                 // Each line of the "crunch-run --list" output is one
390                 // of the following:
391                 //
392                 // * a container UUID, indicating that processes
393                 //   related to that container are currently running.
394                 //   Optionally followed by " stale", indicating that
395                 //   the crunch-run process itself has exited (the
396                 //   remaining process is probably arv-mount).
397                 //
398                 // * the string "broken", indicating that the instance
399                 //   appears incapable of starting containers.
400                 //
401                 // See ListProcesses() in lib/crunchrun/background.go.
402                 if s == "" {
403                         // empty string following final newline
404                 } else if s == "broken" {
405                         reportsBroken = true
406                 } else if toks := strings.Split(s, " "); len(toks) == 1 {
407                         running = append(running, s)
408                 } else if toks[1] == "stale" {
409                         wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
410                         staleRunLock = true
411                 }
412         }
413         wkr.mtx.Lock()
414         defer wkr.mtx.Unlock()
415         if !staleRunLock {
416                 wkr.staleRunLockSince = time.Time{}
417         } else if wkr.staleRunLockSince.IsZero() {
418                 wkr.staleRunLockSince = time.Now()
419         } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
420                 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
421                 reportsBroken = true
422         }
423         return
424 }
425
426 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
427         cmd := wkr.wp.bootProbeCommand
428         if cmd == "" {
429                 cmd = "true"
430         }
431         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
432         logger := wkr.logger.WithFields(logrus.Fields{
433                 "Command": cmd,
434                 "stdout":  string(stdout),
435                 "stderr":  string(stderr),
436         })
437         if err != nil {
438                 logger.WithError(err).Debug("boot probe failed")
439                 return false, stderr
440         }
441         logger.Info("boot probe succeeded")
442         if err = wkr.wp.loadRunnerData(); err != nil {
443                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
444                 return false, stderr
445         } else if len(wkr.wp.runnerData) == 0 {
446                 // Assume crunch-run is already installed
447         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
448                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
449                 return false, stderr2
450         } else {
451                 stderr = append(stderr, stderr2...)
452         }
453         return true, stderr
454 }
455
456 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
457         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
458         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
459         logger := wkr.logger.WithFields(logrus.Fields{
460                 "hash": hash,
461                 "path": wkr.wp.runnerCmd,
462         })
463
464         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
465         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
466                 logger.Info("runner binary already exists on worker, with correct hash")
467                 return
468         }
469
470         // Note touch+chmod come before writing data, to avoid the
471         // possibility of md5 being correct while file mode is
472         // incorrect.
473         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
474         if wkr.instance.RemoteUser() != "root" {
475                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
476         }
477         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
478         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
479         return
480 }
481
482 // caller must have lock.
483 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
484         if wkr.idleBehavior == IdleBehaviorHold {
485                 // Never shut down.
486                 return false
487         }
488         label, threshold := "", wkr.wp.timeoutProbe
489         if wkr.state == StateUnknown || wkr.state == StateBooting {
490                 label, threshold = "new ", wkr.wp.timeoutBooting
491         }
492         if dur < threshold {
493                 return false
494         }
495         wkr.logger.WithFields(logrus.Fields{
496                 "Duration": dur,
497                 "Since":    wkr.probed,
498                 "State":    wkr.state,
499         }).Warnf("%sinstance unresponsive, shutting down", label)
500         wkr.shutdown()
501         return true
502 }
503
504 // Returns true if the instance is eligible for shutdown: either it's
505 // been idle too long, or idleBehavior=Drain and nothing is running.
506 //
507 // caller must have lock.
508 func (wkr *worker) eligibleForShutdown() bool {
509         if wkr.idleBehavior == IdleBehaviorHold {
510                 return false
511         }
512         draining := wkr.idleBehavior == IdleBehaviorDrain
513         switch wkr.state {
514         case StateBooting:
515                 return draining
516         case StateIdle:
517                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
518         case StateRunning:
519                 if !draining {
520                         return false
521                 }
522                 for _, rr := range wkr.running {
523                         if !rr.givenup {
524                                 return false
525                         }
526                 }
527                 for _, rr := range wkr.starting {
528                         if !rr.givenup {
529                                 return false
530                         }
531                 }
532                 // draining, and all remaining runners are just trying
533                 // to force-kill their crunch-run procs
534                 return true
535         default:
536                 return false
537         }
538 }
539
540 // caller must have lock.
541 func (wkr *worker) shutdownIfIdle() bool {
542         if !wkr.eligibleForShutdown() {
543                 return false
544         }
545         wkr.logger.WithFields(logrus.Fields{
546                 "State":        wkr.state,
547                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
548                 "IdleBehavior": wkr.idleBehavior,
549         }).Info("shutdown worker")
550         wkr.reportBootOutcome(BootOutcomeAborted)
551         wkr.shutdown()
552         return true
553 }
554
555 // caller must have lock.
556 func (wkr *worker) shutdown() {
557         now := time.Now()
558         wkr.updated = now
559         wkr.destroyed = now
560         wkr.state = StateShutdown
561         go wkr.wp.notify()
562         go func() {
563                 err := wkr.instance.Destroy()
564                 if err != nil {
565                         wkr.logger.WithError(err).Warn("shutdown failed")
566                         return
567                 }
568         }()
569 }
570
571 // Save worker tags to cloud provider metadata, if they don't already
572 // match. Caller must have lock.
573 func (wkr *worker) saveTags() {
574         instance := wkr.instance
575         tags := instance.Tags()
576         update := cloud.InstanceTags{
577                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
578                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
579         }
580         save := false
581         for k, v := range update {
582                 if tags[k] != v {
583                         tags[k] = v
584                         save = true
585                 }
586         }
587         if save {
588                 go func() {
589                         err := instance.SetTags(tags)
590                         if err != nil {
591                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
592                         }
593                 }()
594         }
595 }
596
597 func (wkr *worker) Close() {
598         // This might take time, so do it after unlocking mtx.
599         defer wkr.executor.Close()
600
601         wkr.mtx.Lock()
602         defer wkr.mtx.Unlock()
603         for uuid, rr := range wkr.running {
604                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
605                 rr.Close()
606         }
607         for uuid, rr := range wkr.starting {
608                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
609                 rr.Close()
610         }
611 }
612
613 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
614 // probe. Returns true if anything was added or removed.
615 //
616 // Caller must have lock.
617 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
618         alive := map[string]bool{}
619         for _, uuid := range ctrUUIDs {
620                 alive[uuid] = true
621                 if _, ok := wkr.running[uuid]; ok {
622                         // unchanged
623                 } else if rr, ok := wkr.starting[uuid]; ok {
624                         wkr.running[uuid] = rr
625                         delete(wkr.starting, uuid)
626                         changed = true
627                 } else {
628                         // We didn't start it -- it must have been
629                         // started by a previous dispatcher process.
630                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
631                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
632                         changed = true
633                 }
634         }
635         for uuid := range wkr.running {
636                 if !alive[uuid] {
637                         wkr.closeRunner(uuid)
638                         changed = true
639                 }
640         }
641         return
642 }
643
644 // caller must have lock.
645 func (wkr *worker) closeRunner(uuid string) {
646         rr := wkr.running[uuid]
647         if rr == nil {
648                 return
649         }
650         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
651         delete(wkr.running, uuid)
652         rr.Close()
653
654         now := time.Now()
655         wkr.updated = now
656         wkr.wp.exited[uuid] = now
657         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
658                 wkr.state = StateIdle
659         }
660 }