16636: add boot outcome metrics.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
47 type BootOutcome string
48
49 const (
50         BootOutcomeFailed       BootOutcome = "failure"
51         BootOutcomeSucceeded    BootOutcome = "success"
52         BootOutcomeIdleShutdown BootOutcome = "idle shutdown"
53         BootOutcomeDisappeared  BootOutcome = "disappeared"
54 )
55
56 var validBootOutcomes = map[BootOutcome]bool{
57         BootOutcomeFailed:       true,
58         BootOutcomeSucceeded:    true,
59         BootOutcomeIdleShutdown: true,
60         BootOutcomeDisappeared:  true,
61 }
62
63 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
64         if wkr.bootOutcomeReported {
65                 return
66         }
67         if wkr.wp.mBootOutcomes != nil {
68                 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
69         }
70         wkr.bootOutcomeReported = true
71 }
72
73 // String implements fmt.Stringer.
74 func (s State) String() string {
75         return stateString[s]
76 }
77
78 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
79 // map[State]anything uses the state's string representation.
80 func (s State) MarshalText() ([]byte, error) {
81         return []byte(stateString[s]), nil
82 }
83
84 // IdleBehavior indicates the behavior desired when a node becomes idle.
85 type IdleBehavior string
86
87 const (
88         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
89         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
90         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
91 )
92
93 var validIdleBehavior = map[IdleBehavior]bool{
94         IdleBehaviorRun:   true,
95         IdleBehaviorHold:  true,
96         IdleBehaviorDrain: true,
97 }
98
99 type worker struct {
100         logger   logrus.FieldLogger
101         executor Executor
102         wp       *Pool
103
104         mtx                 sync.Locker // must be wp's Locker.
105         state               State
106         idleBehavior        IdleBehavior
107         instance            cloud.Instance
108         instType            arvados.InstanceType
109         vcpus               int64
110         memory              int64
111         appeared            time.Time
112         probed              time.Time
113         updated             time.Time
114         busy                time.Time
115         destroyed           time.Time
116         lastUUID            string
117         running             map[string]*remoteRunner // remember to update state idle<->running when this changes
118         starting            map[string]*remoteRunner // remember to update state idle<->running when this changes
119         probing             chan struct{}
120         bootOutcomeReported bool
121 }
122
123 func (wkr *worker) onUnkillable(uuid string) {
124         wkr.mtx.Lock()
125         defer wkr.mtx.Unlock()
126         logger := wkr.logger.WithField("ContainerUUID", uuid)
127         if wkr.idleBehavior == IdleBehaviorHold {
128                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
129                 return
130         }
131         logger.Warn("unkillable container, draining worker")
132         wkr.setIdleBehavior(IdleBehaviorDrain)
133 }
134
135 func (wkr *worker) onKilled(uuid string) {
136         wkr.mtx.Lock()
137         defer wkr.mtx.Unlock()
138         wkr.closeRunner(uuid)
139         go wkr.wp.notify()
140 }
141
142 // caller must have lock.
143 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
144         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
145         wkr.idleBehavior = idleBehavior
146         wkr.saveTags()
147         wkr.shutdownIfIdle()
148 }
149
150 // caller must have lock.
151 func (wkr *worker) startContainer(ctr arvados.Container) {
152         logger := wkr.logger.WithFields(logrus.Fields{
153                 "ContainerUUID": ctr.UUID,
154                 "Priority":      ctr.Priority,
155         })
156         logger.Debug("starting container")
157         rr := newRemoteRunner(ctr.UUID, wkr)
158         wkr.starting[ctr.UUID] = rr
159         if wkr.state != StateRunning {
160                 wkr.state = StateRunning
161                 go wkr.wp.notify()
162         }
163         go func() {
164                 rr.Start()
165                 wkr.mtx.Lock()
166                 defer wkr.mtx.Unlock()
167                 now := time.Now()
168                 wkr.updated = now
169                 wkr.busy = now
170                 delete(wkr.starting, ctr.UUID)
171                 wkr.running[ctr.UUID] = rr
172                 wkr.lastUUID = ctr.UUID
173         }()
174 }
175
176 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
177 // for the worker's curent state. If a previous probe is still
178 // running, it does nothing.
179 //
180 // It should be called in a new goroutine.
181 func (wkr *worker) ProbeAndUpdate() {
182         select {
183         case wkr.probing <- struct{}{}:
184                 wkr.probeAndUpdate()
185                 <-wkr.probing
186         default:
187                 wkr.logger.Debug("still waiting for last probe to finish")
188         }
189 }
190
191 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
192 // updates state accordingly.
193 //
194 // In StateUnknown: Call both probeBooted and probeRunning.
195 // In StateBooting: Call probeBooted; if successful, call probeRunning.
196 // In StateRunning: Call probeRunning.
197 // In StateIdle: Call probeRunning.
198 // In StateShutdown: Do nothing.
199 //
200 // If both probes succeed, wkr.state changes to
201 // StateIdle/StateRunning.
202 //
203 // If probeRunning succeeds, wkr.running is updated. (This means
204 // wkr.running might be non-empty even in StateUnknown, if the boot
205 // probe failed.)
206 //
207 // probeAndUpdate should be called in a new goroutine.
208 func (wkr *worker) probeAndUpdate() {
209         wkr.mtx.Lock()
210         updated := wkr.updated
211         initialState := wkr.state
212         wkr.mtx.Unlock()
213
214         var (
215                 booted   bool
216                 ctrUUIDs []string
217                 ok       bool
218                 stderr   []byte // from probeBooted
219         )
220
221         switch initialState {
222         case StateShutdown:
223                 return
224         case StateIdle, StateRunning:
225                 booted = true
226         case StateUnknown, StateBooting:
227         default:
228                 panic(fmt.Sprintf("unknown state %s", initialState))
229         }
230
231         probeStart := time.Now()
232         logger := wkr.logger.WithField("ProbeStart", probeStart)
233
234         if !booted {
235                 booted, stderr = wkr.probeBooted()
236                 if !booted {
237                         // Pretend this probe succeeded if another
238                         // concurrent attempt succeeded.
239                         wkr.mtx.Lock()
240                         booted = wkr.state == StateRunning || wkr.state == StateIdle
241                         wkr.mtx.Unlock()
242                 }
243                 if booted {
244                         logger.Info("instance booted; will try probeRunning")
245                 }
246         }
247         reportedBroken := false
248         if booted || wkr.state == StateUnknown {
249                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
250         }
251         wkr.mtx.Lock()
252         defer wkr.mtx.Unlock()
253         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
254                 logger.Info("probe reported broken instance")
255                 wkr.reportBootOutcome(BootOutcomeFailed)
256                 wkr.setIdleBehavior(IdleBehaviorDrain)
257         }
258         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
259                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
260                         // Skip the logging noise if shutdown was
261                         // initiated during probe.
262                         return
263                 }
264                 // Using the start time of the probe as the timeout
265                 // threshold ensures we always initiate at least one
266                 // probe attempt after the boot/probe timeout expires
267                 // (otherwise, a slow probe failure could cause us to
268                 // shutdown an instance even though it did in fact
269                 // boot/recover before the timeout expired).
270                 dur := probeStart.Sub(wkr.probed)
271                 if wkr.shutdownIfBroken(dur) {
272                         // stderr from failed run-probes will have
273                         // been logged already, but boot-probe
274                         // failures are normal so they are logged only
275                         // at Debug level. This is our chance to log
276                         // some evidence about why the node never
277                         // booted, even in non-debug mode.
278                         if !booted {
279                                 wkr.reportBootOutcome(BootOutcomeFailed)
280                                 logger.WithFields(logrus.Fields{
281                                         "Duration": dur,
282                                         "stderr":   string(stderr),
283                                 }).Info("boot failed")
284                         }
285                 }
286                 return
287         }
288
289         updateTime := time.Now()
290         wkr.probed = updateTime
291
292         if updated != wkr.updated {
293                 // Worker was updated after the probe began, so
294                 // wkr.running might have a container UUID that was
295                 // not yet running when ctrUUIDs was generated. Leave
296                 // wkr.running alone and wait for the next probe to
297                 // catch up on any changes.
298                 return
299         }
300
301         if len(ctrUUIDs) > 0 {
302                 wkr.busy = updateTime
303                 wkr.lastUUID = ctrUUIDs[0]
304         } else if len(wkr.running) > 0 {
305                 // Actual last-busy time was sometime between wkr.busy
306                 // and now. Now is the earliest opportunity to take
307                 // advantage of the non-busy state, though.
308                 wkr.busy = updateTime
309         }
310
311         changed := wkr.updateRunning(ctrUUIDs)
312
313         // Update state if this was the first successful boot-probe.
314         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
315                 // Note: this will change again below if
316                 // len(wkr.starting)+len(wkr.running) > 0.
317                 wkr.state = StateIdle
318                 changed = true
319         }
320
321         // If wkr.state and wkr.running aren't changing then there's
322         // no need to log anything, notify the scheduler, move state
323         // back and forth between idle/running, etc.
324         if !changed {
325                 return
326         }
327
328         // Log whenever a run-probe reveals crunch-run processes
329         // appearing/disappearing before boot-probe succeeds.
330         if wkr.state == StateUnknown && changed {
331                 logger.WithFields(logrus.Fields{
332                         "RunningContainers": len(wkr.running),
333                         "State":             wkr.state,
334                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
335         }
336
337         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
338                 wkr.state = StateRunning
339         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
340                 wkr.state = StateIdle
341         }
342         wkr.updated = updateTime
343         if booted && (initialState == StateUnknown || initialState == StateBooting) {
344                 wkr.reportBootOutcome(BootOutcomeSucceeded)
345                 logger.WithFields(logrus.Fields{
346                         "RunningContainers": len(wkr.running),
347                         "State":             wkr.state,
348                 }).Info("probes succeeded, instance is in service")
349         }
350         go wkr.wp.notify()
351 }
352
353 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
354         cmd := wkr.wp.runnerCmd + " --list"
355         if u := wkr.instance.RemoteUser(); u != "root" {
356                 cmd = "sudo " + cmd
357         }
358         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
359         if err != nil {
360                 wkr.logger.WithFields(logrus.Fields{
361                         "Command": cmd,
362                         "stdout":  string(stdout),
363                         "stderr":  string(stderr),
364                 }).WithError(err).Warn("probe failed")
365                 return
366         }
367         ok = true
368         for _, s := range strings.Split(string(stdout), "\n") {
369                 if s == "broken" {
370                         reportsBroken = true
371                 } else if s != "" {
372                         running = append(running, s)
373                 }
374         }
375         return
376 }
377
378 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
379         cmd := wkr.wp.bootProbeCommand
380         if cmd == "" {
381                 cmd = "true"
382         }
383         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
384         logger := wkr.logger.WithFields(logrus.Fields{
385                 "Command": cmd,
386                 "stdout":  string(stdout),
387                 "stderr":  string(stderr),
388         })
389         if err != nil {
390                 logger.WithError(err).Debug("boot probe failed")
391                 return false, stderr
392         }
393         logger.Info("boot probe succeeded")
394         if err = wkr.wp.loadRunnerData(); err != nil {
395                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
396                 return false, stderr
397         } else if len(wkr.wp.runnerData) == 0 {
398                 // Assume crunch-run is already installed
399         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
400                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
401                 return false, stderr2
402         } else {
403                 stderr = append(stderr, stderr2...)
404         }
405         return true, stderr
406 }
407
408 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
409         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
410         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
411         logger := wkr.logger.WithFields(logrus.Fields{
412                 "hash": hash,
413                 "path": wkr.wp.runnerCmd,
414         })
415
416         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
417         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
418                 logger.Info("runner binary already exists on worker, with correct hash")
419                 return
420         }
421
422         // Note touch+chmod come before writing data, to avoid the
423         // possibility of md5 being correct while file mode is
424         // incorrect.
425         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
426         if wkr.instance.RemoteUser() != "root" {
427                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
428         }
429         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
430         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
431         return
432 }
433
434 // caller must have lock.
435 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
436         if wkr.idleBehavior == IdleBehaviorHold {
437                 // Never shut down.
438                 return false
439         }
440         label, threshold := "", wkr.wp.timeoutProbe
441         if wkr.state == StateUnknown || wkr.state == StateBooting {
442                 label, threshold = "new ", wkr.wp.timeoutBooting
443         }
444         if dur < threshold {
445                 return false
446         }
447         wkr.logger.WithFields(logrus.Fields{
448                 "Duration": dur,
449                 "Since":    wkr.probed,
450                 "State":    wkr.state,
451         }).Warnf("%sinstance unresponsive, shutting down", label)
452         wkr.shutdown()
453         return true
454 }
455
456 // Returns true if the instance is eligible for shutdown: either it's
457 // been idle too long, or idleBehavior=Drain and nothing is running.
458 //
459 // caller must have lock.
460 func (wkr *worker) eligibleForShutdown() bool {
461         if wkr.idleBehavior == IdleBehaviorHold {
462                 return false
463         }
464         draining := wkr.idleBehavior == IdleBehaviorDrain
465         switch wkr.state {
466         case StateBooting:
467                 return draining
468         case StateIdle:
469                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
470         case StateRunning:
471                 if !draining {
472                         return false
473                 }
474                 for _, rr := range wkr.running {
475                         if !rr.givenup {
476                                 return false
477                         }
478                 }
479                 for _, rr := range wkr.starting {
480                         if !rr.givenup {
481                                 return false
482                         }
483                 }
484                 // draining, and all remaining runners are just trying
485                 // to force-kill their crunch-run procs
486                 return true
487         default:
488                 return false
489         }
490 }
491
492 // caller must have lock.
493 func (wkr *worker) shutdownIfIdle() bool {
494         if !wkr.eligibleForShutdown() {
495                 return false
496         }
497         wkr.logger.WithFields(logrus.Fields{
498                 "State":        wkr.state,
499                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
500                 "IdleBehavior": wkr.idleBehavior,
501         }).Info("shutdown worker")
502         wkr.reportBootOutcome(BootOutcomeIdleShutdown)
503         wkr.shutdown()
504         return true
505 }
506
507 // caller must have lock.
508 func (wkr *worker) shutdown() {
509         now := time.Now()
510         wkr.updated = now
511         wkr.destroyed = now
512         wkr.state = StateShutdown
513         go wkr.wp.notify()
514         go func() {
515                 err := wkr.instance.Destroy()
516                 if err != nil {
517                         wkr.logger.WithError(err).Warn("shutdown failed")
518                         return
519                 }
520         }()
521 }
522
523 // Save worker tags to cloud provider metadata, if they don't already
524 // match. Caller must have lock.
525 func (wkr *worker) saveTags() {
526         instance := wkr.instance
527         tags := instance.Tags()
528         update := cloud.InstanceTags{
529                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
530                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
531         }
532         save := false
533         for k, v := range update {
534                 if tags[k] != v {
535                         tags[k] = v
536                         save = true
537                 }
538         }
539         if save {
540                 go func() {
541                         err := instance.SetTags(tags)
542                         if err != nil {
543                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
544                         }
545                 }()
546         }
547 }
548
549 func (wkr *worker) Close() {
550         // This might take time, so do it after unlocking mtx.
551         defer wkr.executor.Close()
552
553         wkr.mtx.Lock()
554         defer wkr.mtx.Unlock()
555         for uuid, rr := range wkr.running {
556                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
557                 rr.Close()
558         }
559         for uuid, rr := range wkr.starting {
560                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
561                 rr.Close()
562         }
563 }
564
565 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
566 // probe. Returns true if anything was added or removed.
567 //
568 // Caller must have lock.
569 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
570         alive := map[string]bool{}
571         for _, uuid := range ctrUUIDs {
572                 alive[uuid] = true
573                 if _, ok := wkr.running[uuid]; ok {
574                         // unchanged
575                 } else if rr, ok := wkr.starting[uuid]; ok {
576                         wkr.running[uuid] = rr
577                         delete(wkr.starting, uuid)
578                         changed = true
579                 } else {
580                         // We didn't start it -- it must have been
581                         // started by a previous dispatcher process.
582                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
583                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
584                         changed = true
585                 }
586         }
587         for uuid := range wkr.running {
588                 if !alive[uuid] {
589                         wkr.closeRunner(uuid)
590                         changed = true
591                 }
592         }
593         return
594 }
595
596 // caller must have lock.
597 func (wkr *worker) closeRunner(uuid string) {
598         rr := wkr.running[uuid]
599         if rr == nil {
600                 return
601         }
602         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
603         delete(wkr.running, uuid)
604         rr.Close()
605
606         now := time.Now()
607         wkr.updated = now
608         wkr.wp.exited[uuid] = now
609         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
610                 wkr.state = StateIdle
611         }
612 }