15759: Merge branch 'master' into 15759-deploy-crunch-run
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // IdleBehavior indicates the behavior desired when a node becomes idle.
58 type IdleBehavior string
59
60 const (
61         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
62         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
63         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
64 )
65
66 var validIdleBehavior = map[IdleBehavior]bool{
67         IdleBehaviorRun:   true,
68         IdleBehaviorHold:  true,
69         IdleBehaviorDrain: true,
70 }
71
72 type worker struct {
73         logger   logrus.FieldLogger
74         executor Executor
75         wp       *Pool
76
77         mtx          sync.Locker // must be wp's Locker.
78         state        State
79         idleBehavior IdleBehavior
80         instance     cloud.Instance
81         instType     arvados.InstanceType
82         vcpus        int64
83         memory       int64
84         appeared     time.Time
85         probed       time.Time
86         updated      time.Time
87         busy         time.Time
88         destroyed    time.Time
89         lastUUID     string
90         running      map[string]*remoteRunner // remember to update state idle<->running when this changes
91         starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
92         probing      chan struct{}
93 }
94
95 func (wkr *worker) onUnkillable(uuid string) {
96         wkr.mtx.Lock()
97         defer wkr.mtx.Unlock()
98         logger := wkr.logger.WithField("ContainerUUID", uuid)
99         if wkr.idleBehavior == IdleBehaviorHold {
100                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
101                 return
102         }
103         logger.Warn("unkillable container, draining worker")
104         wkr.setIdleBehavior(IdleBehaviorDrain)
105 }
106
107 func (wkr *worker) onKilled(uuid string) {
108         wkr.mtx.Lock()
109         defer wkr.mtx.Unlock()
110         wkr.closeRunner(uuid)
111         go wkr.wp.notify()
112 }
113
114 // caller must have lock.
115 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
116         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
117         wkr.idleBehavior = idleBehavior
118         wkr.saveTags()
119         wkr.shutdownIfIdle()
120 }
121
122 // caller must have lock.
123 func (wkr *worker) startContainer(ctr arvados.Container) {
124         logger := wkr.logger.WithFields(logrus.Fields{
125                 "ContainerUUID": ctr.UUID,
126                 "Priority":      ctr.Priority,
127         })
128         logger.Debug("starting container")
129         rr := newRemoteRunner(ctr.UUID, wkr)
130         wkr.starting[ctr.UUID] = rr
131         if wkr.state != StateRunning {
132                 wkr.state = StateRunning
133                 go wkr.wp.notify()
134         }
135         go func() {
136                 rr.Start()
137                 wkr.mtx.Lock()
138                 defer wkr.mtx.Unlock()
139                 now := time.Now()
140                 wkr.updated = now
141                 wkr.busy = now
142                 delete(wkr.starting, ctr.UUID)
143                 wkr.running[ctr.UUID] = rr
144                 wkr.lastUUID = ctr.UUID
145         }()
146 }
147
148 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
149 // for the worker's curent state. If a previous probe is still
150 // running, it does nothing.
151 //
152 // It should be called in a new goroutine.
153 func (wkr *worker) ProbeAndUpdate() {
154         select {
155         case wkr.probing <- struct{}{}:
156                 wkr.probeAndUpdate()
157                 <-wkr.probing
158         default:
159                 wkr.logger.Debug("still waiting for last probe to finish")
160         }
161 }
162
163 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
164 // updates state accordingly.
165 //
166 // In StateUnknown: Call both probeBooted and probeRunning.
167 // In StateBooting: Call probeBooted; if successful, call probeRunning.
168 // In StateRunning: Call probeRunning.
169 // In StateIdle: Call probeRunning.
170 // In StateShutdown: Do nothing.
171 //
172 // If both probes succeed, wkr.state changes to
173 // StateIdle/StateRunning.
174 //
175 // If probeRunning succeeds, wkr.running is updated. (This means
176 // wkr.running might be non-empty even in StateUnknown, if the boot
177 // probe failed.)
178 //
179 // probeAndUpdate should be called in a new goroutine.
180 func (wkr *worker) probeAndUpdate() {
181         wkr.mtx.Lock()
182         updated := wkr.updated
183         initialState := wkr.state
184         wkr.mtx.Unlock()
185
186         var (
187                 booted   bool
188                 ctrUUIDs []string
189                 ok       bool
190                 stderr   []byte // from probeBooted
191         )
192
193         switch initialState {
194         case StateShutdown:
195                 return
196         case StateIdle, StateRunning:
197                 booted = true
198         case StateUnknown, StateBooting:
199         default:
200                 panic(fmt.Sprintf("unknown state %s", initialState))
201         }
202
203         probeStart := time.Now()
204         logger := wkr.logger.WithField("ProbeStart", probeStart)
205
206         if !booted {
207                 booted, stderr = wkr.probeBooted()
208                 if !booted {
209                         // Pretend this probe succeeded if another
210                         // concurrent attempt succeeded.
211                         wkr.mtx.Lock()
212                         booted = wkr.state == StateRunning || wkr.state == StateIdle
213                         wkr.mtx.Unlock()
214                 }
215                 if booted {
216                         logger.Info("instance booted; will try probeRunning")
217                 }
218         }
219         reportedBroken := false
220         if booted || wkr.state == StateUnknown {
221                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
222         }
223         wkr.mtx.Lock()
224         defer wkr.mtx.Unlock()
225         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
226                 logger.Info("probe reported broken instance")
227                 wkr.setIdleBehavior(IdleBehaviorDrain)
228         }
229         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
230                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
231                         // Skip the logging noise if shutdown was
232                         // initiated during probe.
233                         return
234                 }
235                 // Using the start time of the probe as the timeout
236                 // threshold ensures we always initiate at least one
237                 // probe attempt after the boot/probe timeout expires
238                 // (otherwise, a slow probe failure could cause us to
239                 // shutdown an instance even though it did in fact
240                 // boot/recover before the timeout expired).
241                 dur := probeStart.Sub(wkr.probed)
242                 if wkr.shutdownIfBroken(dur) {
243                         // stderr from failed run-probes will have
244                         // been logged already, but boot-probe
245                         // failures are normal so they are logged only
246                         // at Debug level. This is our chance to log
247                         // some evidence about why the node never
248                         // booted, even in non-debug mode.
249                         if !booted {
250                                 logger.WithFields(logrus.Fields{
251                                         "Duration": dur,
252                                         "stderr":   string(stderr),
253                                 }).Info("boot failed")
254                         }
255                 }
256                 return
257         }
258
259         updateTime := time.Now()
260         wkr.probed = updateTime
261
262         if updated != wkr.updated {
263                 // Worker was updated after the probe began, so
264                 // wkr.running might have a container UUID that was
265                 // not yet running when ctrUUIDs was generated. Leave
266                 // wkr.running alone and wait for the next probe to
267                 // catch up on any changes.
268                 return
269         }
270
271         if len(ctrUUIDs) > 0 {
272                 wkr.busy = updateTime
273                 wkr.lastUUID = ctrUUIDs[0]
274         } else if len(wkr.running) > 0 {
275                 // Actual last-busy time was sometime between wkr.busy
276                 // and now. Now is the earliest opportunity to take
277                 // advantage of the non-busy state, though.
278                 wkr.busy = updateTime
279         }
280
281         changed := wkr.updateRunning(ctrUUIDs)
282
283         // Update state if this was the first successful boot-probe.
284         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
285                 // Note: this will change again below if
286                 // len(wkr.starting)+len(wkr.running) > 0.
287                 wkr.state = StateIdle
288                 changed = true
289         }
290
291         // If wkr.state and wkr.running aren't changing then there's
292         // no need to log anything, notify the scheduler, move state
293         // back and forth between idle/running, etc.
294         if !changed {
295                 return
296         }
297
298         // Log whenever a run-probe reveals crunch-run processes
299         // appearing/disappearing before boot-probe succeeds.
300         if wkr.state == StateUnknown && changed {
301                 logger.WithFields(logrus.Fields{
302                         "RunningContainers": len(wkr.running),
303                         "State":             wkr.state,
304                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
305         }
306
307         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
308                 wkr.state = StateRunning
309         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
310                 wkr.state = StateIdle
311         }
312         wkr.updated = updateTime
313         if booted && (initialState == StateUnknown || initialState == StateBooting) {
314                 logger.WithFields(logrus.Fields{
315                         "RunningContainers": len(wkr.running),
316                         "State":             wkr.state,
317                 }).Info("probes succeeded, instance is in service")
318         }
319         go wkr.wp.notify()
320 }
321
322 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
323         cmd := wkr.wp.runnerCmd + " --list"
324         if u := wkr.instance.RemoteUser(); u != "root" {
325                 cmd = "sudo " + cmd
326         }
327         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
328         if err != nil {
329                 wkr.logger.WithFields(logrus.Fields{
330                         "Command": cmd,
331                         "stdout":  string(stdout),
332                         "stderr":  string(stderr),
333                 }).WithError(err).Warn("probe failed")
334                 return
335         }
336         ok = true
337         for _, s := range strings.Split(string(stdout), "\n") {
338                 if s == "broken" {
339                         reportsBroken = true
340                 } else if s != "" {
341                         running = append(running, s)
342                 }
343         }
344         return
345 }
346
347 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
348         cmd := wkr.wp.bootProbeCommand
349         if cmd == "" {
350                 cmd = "true"
351         }
352         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
353         logger := wkr.logger.WithFields(logrus.Fields{
354                 "Command": cmd,
355                 "stdout":  string(stdout),
356                 "stderr":  string(stderr),
357         })
358         if err != nil {
359                 logger.WithError(err).Debug("boot probe failed")
360                 return false, stderr
361         }
362         logger.Info("boot probe succeeded")
363         if err = wkr.wp.loadRunnerData(); err != nil {
364                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
365                 return false, stderr
366         } else if len(wkr.wp.runnerData) == 0 {
367                 // Assume crunch-run is already installed
368         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
369                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
370                 return false, stderr2
371         } else {
372                 wkr.logger.Info("runner binary OK")
373                 stderr = append(stderr, stderr2...)
374         }
375         return true, stderr
376 }
377
378 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
379         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
380         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
381
382         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
383         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
384                 return
385         }
386
387         // Note touch+chmod come before writing data, to avoid the
388         // possibility of md5 being correct while file mode is
389         // incorrect.
390         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
391         if wkr.instance.RemoteUser() != "root" {
392                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
393         }
394         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
395         return
396 }
397
398 // caller must have lock.
399 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
400         if wkr.idleBehavior == IdleBehaviorHold {
401                 // Never shut down.
402                 return false
403         }
404         label, threshold := "", wkr.wp.timeoutProbe
405         if wkr.state == StateUnknown || wkr.state == StateBooting {
406                 label, threshold = "new ", wkr.wp.timeoutBooting
407         }
408         if dur < threshold {
409                 return false
410         }
411         wkr.logger.WithFields(logrus.Fields{
412                 "Duration": dur,
413                 "Since":    wkr.probed,
414                 "State":    wkr.state,
415         }).Warnf("%sinstance unresponsive, shutting down", label)
416         wkr.shutdown()
417         return true
418 }
419
420 // Returns true if the instance is eligible for shutdown: either it's
421 // been idle too long, or idleBehavior=Drain and nothing is running.
422 //
423 // caller must have lock.
424 func (wkr *worker) eligibleForShutdown() bool {
425         if wkr.idleBehavior == IdleBehaviorHold {
426                 return false
427         }
428         draining := wkr.idleBehavior == IdleBehaviorDrain
429         switch wkr.state {
430         case StateBooting:
431                 return draining
432         case StateIdle:
433                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
434         case StateRunning:
435                 if !draining {
436                         return false
437                 }
438                 for _, rr := range wkr.running {
439                         if !rr.givenup {
440                                 return false
441                         }
442                 }
443                 for _, rr := range wkr.starting {
444                         if !rr.givenup {
445                                 return false
446                         }
447                 }
448                 // draining, and all remaining runners are just trying
449                 // to force-kill their crunch-run procs
450                 return true
451         default:
452                 return false
453         }
454 }
455
456 // caller must have lock.
457 func (wkr *worker) shutdownIfIdle() bool {
458         if !wkr.eligibleForShutdown() {
459                 return false
460         }
461         wkr.logger.WithFields(logrus.Fields{
462                 "State":        wkr.state,
463                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
464                 "IdleBehavior": wkr.idleBehavior,
465         }).Info("shutdown worker")
466         wkr.shutdown()
467         return true
468 }
469
470 // caller must have lock.
471 func (wkr *worker) shutdown() {
472         now := time.Now()
473         wkr.updated = now
474         wkr.destroyed = now
475         wkr.state = StateShutdown
476         go wkr.wp.notify()
477         go func() {
478                 err := wkr.instance.Destroy()
479                 if err != nil {
480                         wkr.logger.WithError(err).Warn("shutdown failed")
481                         return
482                 }
483         }()
484 }
485
486 // Save worker tags to cloud provider metadata, if they don't already
487 // match. Caller must have lock.
488 func (wkr *worker) saveTags() {
489         instance := wkr.instance
490         tags := instance.Tags()
491         update := cloud.InstanceTags{
492                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
493                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
494         }
495         save := false
496         for k, v := range update {
497                 if tags[k] != v {
498                         tags[k] = v
499                         save = true
500                 }
501         }
502         if save {
503                 go func() {
504                         err := instance.SetTags(tags)
505                         if err != nil {
506                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
507                         }
508                 }()
509         }
510 }
511
512 func (wkr *worker) Close() {
513         // This might take time, so do it after unlocking mtx.
514         defer wkr.executor.Close()
515
516         wkr.mtx.Lock()
517         defer wkr.mtx.Unlock()
518         for uuid, rr := range wkr.running {
519                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
520                 rr.Close()
521         }
522         for uuid, rr := range wkr.starting {
523                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
524                 rr.Close()
525         }
526 }
527
528 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
529 // probe. Returns true if anything was added or removed.
530 //
531 // Caller must have lock.
532 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
533         alive := map[string]bool{}
534         for _, uuid := range ctrUUIDs {
535                 alive[uuid] = true
536                 if _, ok := wkr.running[uuid]; ok {
537                         // unchanged
538                 } else if rr, ok := wkr.starting[uuid]; ok {
539                         wkr.running[uuid] = rr
540                         delete(wkr.starting, uuid)
541                         changed = true
542                 } else {
543                         // We didn't start it -- it must have been
544                         // started by a previous dispatcher process.
545                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
546                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
547                         changed = true
548                 }
549         }
550         for uuid := range wkr.running {
551                 if !alive[uuid] {
552                         wkr.closeRunner(uuid)
553                         changed = true
554                 }
555         }
556         return
557 }
558
559 // caller must have lock.
560 func (wkr *worker) closeRunner(uuid string) {
561         rr := wkr.running[uuid]
562         if rr == nil {
563                 return
564         }
565         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
566         delete(wkr.running, uuid)
567         rr.Close()
568
569         now := time.Now()
570         wkr.updated = now
571         wkr.wp.exited[uuid] = now
572         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
573                 wkr.state = StateIdle
574         }
575 }