Merge branch '15759-deploy-crunch-run'
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // IdleBehavior indicates the behavior desired when a node becomes idle.
58 type IdleBehavior string
59
60 const (
61         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
62         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
63         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
64 )
65
66 var validIdleBehavior = map[IdleBehavior]bool{
67         IdleBehaviorRun:   true,
68         IdleBehaviorHold:  true,
69         IdleBehaviorDrain: true,
70 }
71
72 type worker struct {
73         logger   logrus.FieldLogger
74         executor Executor
75         wp       *Pool
76
77         mtx          sync.Locker // must be wp's Locker.
78         state        State
79         idleBehavior IdleBehavior
80         instance     cloud.Instance
81         instType     arvados.InstanceType
82         vcpus        int64
83         memory       int64
84         appeared     time.Time
85         probed       time.Time
86         updated      time.Time
87         busy         time.Time
88         destroyed    time.Time
89         lastUUID     string
90         running      map[string]*remoteRunner // remember to update state idle<->running when this changes
91         starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
92         probing      chan struct{}
93 }
94
95 func (wkr *worker) onUnkillable(uuid string) {
96         wkr.mtx.Lock()
97         defer wkr.mtx.Unlock()
98         logger := wkr.logger.WithField("ContainerUUID", uuid)
99         if wkr.idleBehavior == IdleBehaviorHold {
100                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
101                 return
102         }
103         logger.Warn("unkillable container, draining worker")
104         wkr.setIdleBehavior(IdleBehaviorDrain)
105 }
106
107 func (wkr *worker) onKilled(uuid string) {
108         wkr.mtx.Lock()
109         defer wkr.mtx.Unlock()
110         wkr.closeRunner(uuid)
111         go wkr.wp.notify()
112 }
113
114 // caller must have lock.
115 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
116         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
117         wkr.idleBehavior = idleBehavior
118         wkr.saveTags()
119         wkr.shutdownIfIdle()
120 }
121
122 // caller must have lock.
123 func (wkr *worker) startContainer(ctr arvados.Container) {
124         logger := wkr.logger.WithFields(logrus.Fields{
125                 "ContainerUUID": ctr.UUID,
126                 "Priority":      ctr.Priority,
127         })
128         logger.Debug("starting container")
129         rr := newRemoteRunner(ctr.UUID, wkr)
130         wkr.starting[ctr.UUID] = rr
131         if wkr.state != StateRunning {
132                 wkr.state = StateRunning
133                 go wkr.wp.notify()
134         }
135         go func() {
136                 rr.Start()
137                 wkr.mtx.Lock()
138                 defer wkr.mtx.Unlock()
139                 now := time.Now()
140                 wkr.updated = now
141                 wkr.busy = now
142                 delete(wkr.starting, ctr.UUID)
143                 wkr.running[ctr.UUID] = rr
144                 wkr.lastUUID = ctr.UUID
145         }()
146 }
147
148 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
149 // for the worker's curent state. If a previous probe is still
150 // running, it does nothing.
151 //
152 // It should be called in a new goroutine.
153 func (wkr *worker) ProbeAndUpdate() {
154         select {
155         case wkr.probing <- struct{}{}:
156                 wkr.probeAndUpdate()
157                 <-wkr.probing
158         default:
159                 wkr.logger.Debug("still waiting for last probe to finish")
160         }
161 }
162
163 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
164 // updates state accordingly.
165 //
166 // In StateUnknown: Call both probeBooted and probeRunning.
167 // In StateBooting: Call probeBooted; if successful, call probeRunning.
168 // In StateRunning: Call probeRunning.
169 // In StateIdle: Call probeRunning.
170 // In StateShutdown: Do nothing.
171 //
172 // If both probes succeed, wkr.state changes to
173 // StateIdle/StateRunning.
174 //
175 // If probeRunning succeeds, wkr.running is updated. (This means
176 // wkr.running might be non-empty even in StateUnknown, if the boot
177 // probe failed.)
178 //
179 // probeAndUpdate should be called in a new goroutine.
180 func (wkr *worker) probeAndUpdate() {
181         wkr.mtx.Lock()
182         updated := wkr.updated
183         initialState := wkr.state
184         wkr.mtx.Unlock()
185
186         var (
187                 booted   bool
188                 ctrUUIDs []string
189                 ok       bool
190                 stderr   []byte // from probeBooted
191         )
192
193         switch initialState {
194         case StateShutdown:
195                 return
196         case StateIdle, StateRunning:
197                 booted = true
198         case StateUnknown, StateBooting:
199         default:
200                 panic(fmt.Sprintf("unknown state %s", initialState))
201         }
202
203         probeStart := time.Now()
204         logger := wkr.logger.WithField("ProbeStart", probeStart)
205
206         if !booted {
207                 booted, stderr = wkr.probeBooted()
208                 if !booted {
209                         // Pretend this probe succeeded if another
210                         // concurrent attempt succeeded.
211                         wkr.mtx.Lock()
212                         booted = wkr.state == StateRunning || wkr.state == StateIdle
213                         wkr.mtx.Unlock()
214                 }
215                 if booted {
216                         logger.Info("instance booted; will try probeRunning")
217                 }
218         }
219         reportedBroken := false
220         if booted || wkr.state == StateUnknown {
221                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
222         }
223         wkr.mtx.Lock()
224         defer wkr.mtx.Unlock()
225         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
226                 logger.Info("probe reported broken instance")
227                 wkr.setIdleBehavior(IdleBehaviorDrain)
228         }
229         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
230                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
231                         // Skip the logging noise if shutdown was
232                         // initiated during probe.
233                         return
234                 }
235                 // Using the start time of the probe as the timeout
236                 // threshold ensures we always initiate at least one
237                 // probe attempt after the boot/probe timeout expires
238                 // (otherwise, a slow probe failure could cause us to
239                 // shutdown an instance even though it did in fact
240                 // boot/recover before the timeout expired).
241                 dur := probeStart.Sub(wkr.probed)
242                 if wkr.shutdownIfBroken(dur) {
243                         // stderr from failed run-probes will have
244                         // been logged already, but boot-probe
245                         // failures are normal so they are logged only
246                         // at Debug level. This is our chance to log
247                         // some evidence about why the node never
248                         // booted, even in non-debug mode.
249                         if !booted {
250                                 logger.WithFields(logrus.Fields{
251                                         "Duration": dur,
252                                         "stderr":   string(stderr),
253                                 }).Info("boot failed")
254                         }
255                 }
256                 return
257         }
258
259         updateTime := time.Now()
260         wkr.probed = updateTime
261
262         if updated != wkr.updated {
263                 // Worker was updated after the probe began, so
264                 // wkr.running might have a container UUID that was
265                 // not yet running when ctrUUIDs was generated. Leave
266                 // wkr.running alone and wait for the next probe to
267                 // catch up on any changes.
268                 return
269         }
270
271         if len(ctrUUIDs) > 0 {
272                 wkr.busy = updateTime
273                 wkr.lastUUID = ctrUUIDs[0]
274         } else if len(wkr.running) > 0 {
275                 // Actual last-busy time was sometime between wkr.busy
276                 // and now. Now is the earliest opportunity to take
277                 // advantage of the non-busy state, though.
278                 wkr.busy = updateTime
279         }
280
281         changed := wkr.updateRunning(ctrUUIDs)
282
283         // Update state if this was the first successful boot-probe.
284         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
285                 // Note: this will change again below if
286                 // len(wkr.starting)+len(wkr.running) > 0.
287                 wkr.state = StateIdle
288                 changed = true
289         }
290
291         // If wkr.state and wkr.running aren't changing then there's
292         // no need to log anything, notify the scheduler, move state
293         // back and forth between idle/running, etc.
294         if !changed {
295                 return
296         }
297
298         // Log whenever a run-probe reveals crunch-run processes
299         // appearing/disappearing before boot-probe succeeds.
300         if wkr.state == StateUnknown && changed {
301                 logger.WithFields(logrus.Fields{
302                         "RunningContainers": len(wkr.running),
303                         "State":             wkr.state,
304                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
305         }
306
307         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
308                 wkr.state = StateRunning
309         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
310                 wkr.state = StateIdle
311         }
312         wkr.updated = updateTime
313         if booted && (initialState == StateUnknown || initialState == StateBooting) {
314                 logger.WithFields(logrus.Fields{
315                         "RunningContainers": len(wkr.running),
316                         "State":             wkr.state,
317                 }).Info("probes succeeded, instance is in service")
318         }
319         go wkr.wp.notify()
320 }
321
322 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
323         cmd := wkr.wp.runnerCmd + " --list"
324         if u := wkr.instance.RemoteUser(); u != "root" {
325                 cmd = "sudo " + cmd
326         }
327         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
328         if err != nil {
329                 wkr.logger.WithFields(logrus.Fields{
330                         "Command": cmd,
331                         "stdout":  string(stdout),
332                         "stderr":  string(stderr),
333                 }).WithError(err).Warn("probe failed")
334                 return
335         }
336         ok = true
337         for _, s := range strings.Split(string(stdout), "\n") {
338                 if s == "broken" {
339                         reportsBroken = true
340                 } else if s != "" {
341                         running = append(running, s)
342                 }
343         }
344         return
345 }
346
347 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
348         cmd := wkr.wp.bootProbeCommand
349         if cmd == "" {
350                 cmd = "true"
351         }
352         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
353         logger := wkr.logger.WithFields(logrus.Fields{
354                 "Command": cmd,
355                 "stdout":  string(stdout),
356                 "stderr":  string(stderr),
357         })
358         if err != nil {
359                 logger.WithError(err).Debug("boot probe failed")
360                 return false, stderr
361         }
362         logger.Info("boot probe succeeded")
363         if err = wkr.wp.loadRunnerData(); err != nil {
364                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
365                 return false, stderr
366         } else if len(wkr.wp.runnerData) == 0 {
367                 // Assume crunch-run is already installed
368         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
369                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
370                 return false, stderr2
371         } else {
372                 stderr = append(stderr, stderr2...)
373         }
374         return true, stderr
375 }
376
377 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
378         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
379         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
380         logger := wkr.logger.WithFields(logrus.Fields{
381                 "hash": hash,
382                 "path": wkr.wp.runnerCmd,
383         })
384
385         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
386         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
387                 logger.Info("runner binary already exists on worker, with correct hash")
388                 return
389         }
390
391         // Note touch+chmod come before writing data, to avoid the
392         // possibility of md5 being correct while file mode is
393         // incorrect.
394         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
395         if wkr.instance.RemoteUser() != "root" {
396                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
397         }
398         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
399         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
400         return
401 }
402
403 // caller must have lock.
404 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
405         if wkr.idleBehavior == IdleBehaviorHold {
406                 // Never shut down.
407                 return false
408         }
409         label, threshold := "", wkr.wp.timeoutProbe
410         if wkr.state == StateUnknown || wkr.state == StateBooting {
411                 label, threshold = "new ", wkr.wp.timeoutBooting
412         }
413         if dur < threshold {
414                 return false
415         }
416         wkr.logger.WithFields(logrus.Fields{
417                 "Duration": dur,
418                 "Since":    wkr.probed,
419                 "State":    wkr.state,
420         }).Warnf("%sinstance unresponsive, shutting down", label)
421         wkr.shutdown()
422         return true
423 }
424
425 // Returns true if the instance is eligible for shutdown: either it's
426 // been idle too long, or idleBehavior=Drain and nothing is running.
427 //
428 // caller must have lock.
429 func (wkr *worker) eligibleForShutdown() bool {
430         if wkr.idleBehavior == IdleBehaviorHold {
431                 return false
432         }
433         draining := wkr.idleBehavior == IdleBehaviorDrain
434         switch wkr.state {
435         case StateBooting:
436                 return draining
437         case StateIdle:
438                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
439         case StateRunning:
440                 if !draining {
441                         return false
442                 }
443                 for _, rr := range wkr.running {
444                         if !rr.givenup {
445                                 return false
446                         }
447                 }
448                 for _, rr := range wkr.starting {
449                         if !rr.givenup {
450                                 return false
451                         }
452                 }
453                 // draining, and all remaining runners are just trying
454                 // to force-kill their crunch-run procs
455                 return true
456         default:
457                 return false
458         }
459 }
460
461 // caller must have lock.
462 func (wkr *worker) shutdownIfIdle() bool {
463         if !wkr.eligibleForShutdown() {
464                 return false
465         }
466         wkr.logger.WithFields(logrus.Fields{
467                 "State":        wkr.state,
468                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
469                 "IdleBehavior": wkr.idleBehavior,
470         }).Info("shutdown worker")
471         wkr.shutdown()
472         return true
473 }
474
475 // caller must have lock.
476 func (wkr *worker) shutdown() {
477         now := time.Now()
478         wkr.updated = now
479         wkr.destroyed = now
480         wkr.state = StateShutdown
481         go wkr.wp.notify()
482         go func() {
483                 err := wkr.instance.Destroy()
484                 if err != nil {
485                         wkr.logger.WithError(err).Warn("shutdown failed")
486                         return
487                 }
488         }()
489 }
490
491 // Save worker tags to cloud provider metadata, if they don't already
492 // match. Caller must have lock.
493 func (wkr *worker) saveTags() {
494         instance := wkr.instance
495         tags := instance.Tags()
496         update := cloud.InstanceTags{
497                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
498                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
499         }
500         save := false
501         for k, v := range update {
502                 if tags[k] != v {
503                         tags[k] = v
504                         save = true
505                 }
506         }
507         if save {
508                 go func() {
509                         err := instance.SetTags(tags)
510                         if err != nil {
511                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
512                         }
513                 }()
514         }
515 }
516
517 func (wkr *worker) Close() {
518         // This might take time, so do it after unlocking mtx.
519         defer wkr.executor.Close()
520
521         wkr.mtx.Lock()
522         defer wkr.mtx.Unlock()
523         for uuid, rr := range wkr.running {
524                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
525                 rr.Close()
526         }
527         for uuid, rr := range wkr.starting {
528                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
529                 rr.Close()
530         }
531 }
532
533 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
534 // probe. Returns true if anything was added or removed.
535 //
536 // Caller must have lock.
537 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
538         alive := map[string]bool{}
539         for _, uuid := range ctrUUIDs {
540                 alive[uuid] = true
541                 if _, ok := wkr.running[uuid]; ok {
542                         // unchanged
543                 } else if rr, ok := wkr.starting[uuid]; ok {
544                         wkr.running[uuid] = rr
545                         delete(wkr.starting, uuid)
546                         changed = true
547                 } else {
548                         // We didn't start it -- it must have been
549                         // started by a previous dispatcher process.
550                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
551                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
552                         changed = true
553                 }
554         }
555         for uuid := range wkr.running {
556                 if !alive[uuid] {
557                         wkr.closeRunner(uuid)
558                         changed = true
559                 }
560         }
561         return
562 }
563
564 // caller must have lock.
565 func (wkr *worker) closeRunner(uuid string) {
566         rr := wkr.running[uuid]
567         if rr == nil {
568                 return
569         }
570         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
571         delete(wkr.running, uuid)
572         rr.Close()
573
574         now := time.Now()
575         wkr.updated = now
576         wkr.wp.exited[uuid] = now
577         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
578                 wkr.state = StateIdle
579         }
580 }