16636: Merge branch 'master' into 16636-more-metrics
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
58 type BootOutcome string
59
60 const (
61         BootOutcomeFailed      BootOutcome = "failure"
62         BootOutcomeSucceeded   BootOutcome = "success"
63         BootOutcomeAborted     BootOutcome = "aborted"
64         BootOutcomeDisappeared BootOutcome = "disappeared"
65 )
66
67 var validBootOutcomes = map[BootOutcome]bool{
68         BootOutcomeFailed:      true,
69         BootOutcomeSucceeded:   true,
70         BootOutcomeAborted:     true,
71         BootOutcomeDisappeared: true,
72 }
73
74 // IdleBehavior indicates the behavior desired when a node becomes idle.
75 type IdleBehavior string
76
77 const (
78         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
79         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
80         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
81 )
82
83 var validIdleBehavior = map[IdleBehavior]bool{
84         IdleBehaviorRun:   true,
85         IdleBehaviorHold:  true,
86         IdleBehaviorDrain: true,
87 }
88
89 type worker struct {
90         logger   logrus.FieldLogger
91         executor Executor
92         wp       *Pool
93
94         mtx                 sync.Locker // must be wp's Locker.
95         state               State
96         idleBehavior        IdleBehavior
97         instance            cloud.Instance
98         instType            arvados.InstanceType
99         vcpus               int64
100         memory              int64
101         appeared            time.Time
102         probed              time.Time
103         updated             time.Time
104         busy                time.Time
105         destroyed           time.Time
106         firstSSHConnection  time.Time
107         lastUUID            string
108         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
109         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
110         probing             chan struct{}
111         bootOutcomeReported bool
112         timeToReadyReported bool
113 }
114
115 func (wkr *worker) onUnkillable(uuid string) {
116         wkr.mtx.Lock()
117         defer wkr.mtx.Unlock()
118         logger := wkr.logger.WithField("ContainerUUID", uuid)
119         if wkr.idleBehavior == IdleBehaviorHold {
120                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
121                 return
122         }
123         logger.Warn("unkillable container, draining worker")
124         wkr.setIdleBehavior(IdleBehaviorDrain)
125 }
126
127 func (wkr *worker) onKilled(uuid string) {
128         wkr.mtx.Lock()
129         defer wkr.mtx.Unlock()
130         wkr.closeRunner(uuid)
131         go wkr.wp.notify()
132 }
133
134 // caller must have lock.
135 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
136         if wkr.bootOutcomeReported {
137                 return
138         }
139         if wkr.wp.mBootOutcomes != nil {
140                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
141         }
142         wkr.bootOutcomeReported = true
143 }
144
145 // caller must have lock.
146 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
147         if wkr.timeToReadyReported {
148                 return
149         }
150         if wkr.wp.mTimeToSSH != nil {
151                 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
152         }
153         wkr.timeToReadyReported = true
154 }
155
156 // caller must have lock.
157 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
158         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
159         wkr.idleBehavior = idleBehavior
160         wkr.saveTags()
161         wkr.shutdownIfIdle()
162 }
163
164 // caller must have lock.
165 func (wkr *worker) startContainer(ctr arvados.Container) {
166         logger := wkr.logger.WithFields(logrus.Fields{
167                 "ContainerUUID": ctr.UUID,
168                 "Priority":      ctr.Priority,
169         })
170         logger.Debug("starting container")
171         rr := newRemoteRunner(ctr.UUID, wkr)
172         wkr.starting[ctr.UUID] = rr
173         if wkr.state != StateRunning {
174                 wkr.state = StateRunning
175                 go wkr.wp.notify()
176         }
177         go func() {
178                 rr.Start()
179                 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
180                         wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
181                 }
182                 wkr.mtx.Lock()
183                 defer wkr.mtx.Unlock()
184                 now := time.Now()
185                 wkr.updated = now
186                 wkr.busy = now
187                 delete(wkr.starting, ctr.UUID)
188                 wkr.running[ctr.UUID] = rr
189                 wkr.lastUUID = ctr.UUID
190         }()
191 }
192
193 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
194 // for the worker's curent state. If a previous probe is still
195 // running, it does nothing.
196 //
197 // It should be called in a new goroutine.
198 func (wkr *worker) ProbeAndUpdate() {
199         select {
200         case wkr.probing <- struct{}{}:
201                 wkr.probeAndUpdate()
202                 <-wkr.probing
203         default:
204                 wkr.logger.Debug("still waiting for last probe to finish")
205         }
206 }
207
208 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
209 // updates state accordingly.
210 //
211 // In StateUnknown: Call both probeBooted and probeRunning.
212 // In StateBooting: Call probeBooted; if successful, call probeRunning.
213 // In StateRunning: Call probeRunning.
214 // In StateIdle: Call probeRunning.
215 // In StateShutdown: Do nothing.
216 //
217 // If both probes succeed, wkr.state changes to
218 // StateIdle/StateRunning.
219 //
220 // If probeRunning succeeds, wkr.running is updated. (This means
221 // wkr.running might be non-empty even in StateUnknown, if the boot
222 // probe failed.)
223 //
224 // probeAndUpdate should be called in a new goroutine.
225 func (wkr *worker) probeAndUpdate() {
226         wkr.mtx.Lock()
227         updated := wkr.updated
228         initialState := wkr.state
229         wkr.mtx.Unlock()
230
231         var (
232                 booted   bool
233                 ctrUUIDs []string
234                 ok       bool
235                 stderr   []byte // from probeBooted
236         )
237
238         switch initialState {
239         case StateShutdown:
240                 return
241         case StateIdle, StateRunning:
242                 booted = true
243         case StateUnknown, StateBooting:
244         default:
245                 panic(fmt.Sprintf("unknown state %s", initialState))
246         }
247
248         probeStart := time.Now()
249         logger := wkr.logger.WithField("ProbeStart", probeStart)
250
251         if !booted {
252                 booted, stderr = wkr.probeBooted()
253                 if !booted {
254                         // Pretend this probe succeeded if another
255                         // concurrent attempt succeeded.
256                         wkr.mtx.Lock()
257                         booted = wkr.state == StateRunning || wkr.state == StateIdle
258                         wkr.mtx.Unlock()
259                 }
260                 if booted {
261                         logger.Info("instance booted; will try probeRunning")
262                 }
263         }
264         reportedBroken := false
265         if booted || wkr.state == StateUnknown {
266                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
267         }
268         wkr.mtx.Lock()
269         defer wkr.mtx.Unlock()
270         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
271                 logger.Info("probe reported broken instance")
272                 wkr.reportBootOutcome(BootOutcomeFailed)
273                 wkr.setIdleBehavior(IdleBehaviorDrain)
274         }
275         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
276                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
277                         // Skip the logging noise if shutdown was
278                         // initiated during probe.
279                         return
280                 }
281                 // Using the start time of the probe as the timeout
282                 // threshold ensures we always initiate at least one
283                 // probe attempt after the boot/probe timeout expires
284                 // (otherwise, a slow probe failure could cause us to
285                 // shutdown an instance even though it did in fact
286                 // boot/recover before the timeout expired).
287                 dur := probeStart.Sub(wkr.probed)
288                 if wkr.shutdownIfBroken(dur) {
289                         // stderr from failed run-probes will have
290                         // been logged already, but boot-probe
291                         // failures are normal so they are logged only
292                         // at Debug level. This is our chance to log
293                         // some evidence about why the node never
294                         // booted, even in non-debug mode.
295                         if !booted {
296                                 wkr.reportBootOutcome(BootOutcomeFailed)
297                                 logger.WithFields(logrus.Fields{
298                                         "Duration": dur,
299                                         "stderr":   string(stderr),
300                                 }).Info("boot failed")
301                         }
302                 }
303                 return
304         }
305
306         updateTime := time.Now()
307         wkr.probed = updateTime
308
309         if updated != wkr.updated {
310                 // Worker was updated after the probe began, so
311                 // wkr.running might have a container UUID that was
312                 // not yet running when ctrUUIDs was generated. Leave
313                 // wkr.running alone and wait for the next probe to
314                 // catch up on any changes.
315                 return
316         }
317
318         if len(ctrUUIDs) > 0 {
319                 wkr.busy = updateTime
320                 wkr.lastUUID = ctrUUIDs[0]
321         } else if len(wkr.running) > 0 {
322                 // Actual last-busy time was sometime between wkr.busy
323                 // and now. Now is the earliest opportunity to take
324                 // advantage of the non-busy state, though.
325                 wkr.busy = updateTime
326         }
327
328         changed := wkr.updateRunning(ctrUUIDs)
329
330         // Update state if this was the first successful boot-probe.
331         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
332                 if wkr.state == StateBooting {
333                         wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
334                 }
335                 // Note: this will change again below if
336                 // len(wkr.starting)+len(wkr.running) > 0.
337                 wkr.state = StateIdle
338                 changed = true
339         }
340
341         // If wkr.state and wkr.running aren't changing then there's
342         // no need to log anything, notify the scheduler, move state
343         // back and forth between idle/running, etc.
344         if !changed {
345                 return
346         }
347
348         // Log whenever a run-probe reveals crunch-run processes
349         // appearing/disappearing before boot-probe succeeds.
350         if wkr.state == StateUnknown && changed {
351                 logger.WithFields(logrus.Fields{
352                         "RunningContainers": len(wkr.running),
353                         "State":             wkr.state,
354                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
355         }
356
357         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
358                 wkr.state = StateRunning
359         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
360                 wkr.state = StateIdle
361         }
362         wkr.updated = updateTime
363         if booted && (initialState == StateUnknown || initialState == StateBooting) {
364                 wkr.reportBootOutcome(BootOutcomeSucceeded)
365                 logger.WithFields(logrus.Fields{
366                         "RunningContainers": len(wkr.running),
367                         "State":             wkr.state,
368                 }).Info("probes succeeded, instance is in service")
369         }
370         go wkr.wp.notify()
371 }
372
373 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
374         cmd := wkr.wp.runnerCmd + " --list"
375         if u := wkr.instance.RemoteUser(); u != "root" {
376                 cmd = "sudo " + cmd
377         }
378         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
379         if err != nil {
380                 wkr.logger.WithFields(logrus.Fields{
381                         "Command": cmd,
382                         "stdout":  string(stdout),
383                         "stderr":  string(stderr),
384                 }).WithError(err).Warn("probe failed")
385                 return
386         }
387         ok = true
388         for _, s := range strings.Split(string(stdout), "\n") {
389                 if s == "broken" {
390                         reportsBroken = true
391                 } else if s != "" {
392                         running = append(running, s)
393                 }
394         }
395         return
396 }
397
398 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
399         cmd := wkr.wp.bootProbeCommand
400         if cmd == "" {
401                 cmd = "true"
402         }
403         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
404         logger := wkr.logger.WithFields(logrus.Fields{
405                 "Command": cmd,
406                 "stdout":  string(stdout),
407                 "stderr":  string(stderr),
408         })
409         if err != nil {
410                 logger.WithError(err).Debug("boot probe failed")
411                 return false, stderr
412         }
413         logger.Info("boot probe succeeded")
414         if err = wkr.wp.loadRunnerData(); err != nil {
415                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
416                 return false, stderr
417         } else if len(wkr.wp.runnerData) == 0 {
418                 // Assume crunch-run is already installed
419         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
420                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
421                 return false, stderr2
422         } else {
423                 stderr = append(stderr, stderr2...)
424         }
425         return true, stderr
426 }
427
428 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
429         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
430         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
431         logger := wkr.logger.WithFields(logrus.Fields{
432                 "hash": hash,
433                 "path": wkr.wp.runnerCmd,
434         })
435
436         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
437         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
438                 logger.Info("runner binary already exists on worker, with correct hash")
439                 return
440         }
441
442         // Note touch+chmod come before writing data, to avoid the
443         // possibility of md5 being correct while file mode is
444         // incorrect.
445         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
446         if wkr.instance.RemoteUser() != "root" {
447                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
448         }
449         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
450         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
451         return
452 }
453
454 // caller must have lock.
455 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
456         if wkr.idleBehavior == IdleBehaviorHold {
457                 // Never shut down.
458                 return false
459         }
460         label, threshold := "", wkr.wp.timeoutProbe
461         if wkr.state == StateUnknown || wkr.state == StateBooting {
462                 label, threshold = "new ", wkr.wp.timeoutBooting
463         }
464         if dur < threshold {
465                 return false
466         }
467         wkr.logger.WithFields(logrus.Fields{
468                 "Duration": dur,
469                 "Since":    wkr.probed,
470                 "State":    wkr.state,
471         }).Warnf("%sinstance unresponsive, shutting down", label)
472         wkr.shutdown()
473         return true
474 }
475
476 // Returns true if the instance is eligible for shutdown: either it's
477 // been idle too long, or idleBehavior=Drain and nothing is running.
478 //
479 // caller must have lock.
480 func (wkr *worker) eligibleForShutdown() bool {
481         if wkr.idleBehavior == IdleBehaviorHold {
482                 return false
483         }
484         draining := wkr.idleBehavior == IdleBehaviorDrain
485         switch wkr.state {
486         case StateBooting:
487                 return draining
488         case StateIdle:
489                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
490         case StateRunning:
491                 if !draining {
492                         return false
493                 }
494                 for _, rr := range wkr.running {
495                         if !rr.givenup {
496                                 return false
497                         }
498                 }
499                 for _, rr := range wkr.starting {
500                         if !rr.givenup {
501                                 return false
502                         }
503                 }
504                 // draining, and all remaining runners are just trying
505                 // to force-kill their crunch-run procs
506                 return true
507         default:
508                 return false
509         }
510 }
511
512 // caller must have lock.
513 func (wkr *worker) shutdownIfIdle() bool {
514         if !wkr.eligibleForShutdown() {
515                 return false
516         }
517         wkr.logger.WithFields(logrus.Fields{
518                 "State":        wkr.state,
519                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
520                 "IdleBehavior": wkr.idleBehavior,
521         }).Info("shutdown worker")
522         wkr.reportBootOutcome(BootOutcomeAborted)
523         wkr.shutdown()
524         return true
525 }
526
527 // caller must have lock.
528 func (wkr *worker) shutdown() {
529         now := time.Now()
530         wkr.updated = now
531         wkr.destroyed = now
532         wkr.state = StateShutdown
533         go wkr.wp.notify()
534         go func() {
535                 err := wkr.instance.Destroy()
536                 if err != nil {
537                         wkr.logger.WithError(err).Warn("shutdown failed")
538                         return
539                 }
540         }()
541 }
542
543 // Save worker tags to cloud provider metadata, if they don't already
544 // match. Caller must have lock.
545 func (wkr *worker) saveTags() {
546         instance := wkr.instance
547         tags := instance.Tags()
548         update := cloud.InstanceTags{
549                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
550                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
551         }
552         save := false
553         for k, v := range update {
554                 if tags[k] != v {
555                         tags[k] = v
556                         save = true
557                 }
558         }
559         if save {
560                 go func() {
561                         err := instance.SetTags(tags)
562                         if err != nil {
563                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
564                         }
565                 }()
566         }
567 }
568
569 func (wkr *worker) Close() {
570         // This might take time, so do it after unlocking mtx.
571         defer wkr.executor.Close()
572
573         wkr.mtx.Lock()
574         defer wkr.mtx.Unlock()
575         for uuid, rr := range wkr.running {
576                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
577                 rr.Close()
578         }
579         for uuid, rr := range wkr.starting {
580                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
581                 rr.Close()
582         }
583 }
584
585 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
586 // probe. Returns true if anything was added or removed.
587 //
588 // Caller must have lock.
589 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
590         alive := map[string]bool{}
591         for _, uuid := range ctrUUIDs {
592                 alive[uuid] = true
593                 if _, ok := wkr.running[uuid]; ok {
594                         // unchanged
595                 } else if rr, ok := wkr.starting[uuid]; ok {
596                         wkr.running[uuid] = rr
597                         delete(wkr.starting, uuid)
598                         changed = true
599                 } else {
600                         // We didn't start it -- it must have been
601                         // started by a previous dispatcher process.
602                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
603                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
604                         changed = true
605                 }
606         }
607         for uuid := range wkr.running {
608                 if !alive[uuid] {
609                         wkr.closeRunner(uuid)
610                         changed = true
611                 }
612         }
613         return
614 }
615
616 // caller must have lock.
617 func (wkr *worker) closeRunner(uuid string) {
618         rr := wkr.running[uuid]
619         if rr == nil {
620                 return
621         }
622         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
623         delete(wkr.running, uuid)
624         rr.Close()
625
626         now := time.Now()
627         wkr.updated = now
628         wkr.wp.exited[uuid] = now
629         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
630                 wkr.state = StateIdle
631         }
632 }