Merge branch '16616-publish-packer-compute-images'
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "path/filepath"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/lib/cloud"
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // IdleBehavior indicates the behavior desired when a node becomes idle.
58 type IdleBehavior string
59
60 const (
61         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
62         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
63         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
64 )
65
66 var validIdleBehavior = map[IdleBehavior]bool{
67         IdleBehaviorRun:   true,
68         IdleBehaviorHold:  true,
69         IdleBehaviorDrain: true,
70 }
71
72 type worker struct {
73         logger   logrus.FieldLogger
74         executor Executor
75         wp       *Pool
76
77         mtx          sync.Locker // must be wp's Locker.
78         state        State
79         idleBehavior IdleBehavior
80         instance     cloud.Instance
81         instType     arvados.InstanceType
82         vcpus        int64
83         memory       int64
84         appeared     time.Time
85         probed       time.Time
86         updated      time.Time
87         busy         time.Time
88         destroyed    time.Time
89         lastUUID     string
90         running      map[string]*remoteRunner // remember to update state idle<->running when this changes
91         starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
92         probing      chan struct{}
93 }
94
95 func (wkr *worker) onUnkillable(uuid string) {
96         wkr.mtx.Lock()
97         defer wkr.mtx.Unlock()
98         logger := wkr.logger.WithField("ContainerUUID", uuid)
99         if wkr.idleBehavior == IdleBehaviorHold {
100                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
101                 return
102         }
103         logger.Warn("unkillable container, draining worker")
104         wkr.setIdleBehavior(IdleBehaviorDrain)
105 }
106
107 func (wkr *worker) onKilled(uuid string) {
108         wkr.mtx.Lock()
109         defer wkr.mtx.Unlock()
110         wkr.closeRunner(uuid)
111         go wkr.wp.notify()
112 }
113
114 // caller must have lock.
115 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
116         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
117         wkr.idleBehavior = idleBehavior
118         wkr.saveTags()
119         wkr.shutdownIfIdle()
120 }
121
122 // caller must have lock.
123 func (wkr *worker) startContainer(ctr arvados.Container) {
124         logger := wkr.logger.WithFields(logrus.Fields{
125                 "ContainerUUID": ctr.UUID,
126                 "Priority":      ctr.Priority,
127         })
128         logger.Debug("starting container")
129         rr := newRemoteRunner(ctr.UUID, wkr)
130         wkr.starting[ctr.UUID] = rr
131         if wkr.state != StateRunning {
132                 wkr.state = StateRunning
133                 go wkr.wp.notify()
134         }
135         go func() {
136                 rr.Start()
137                 wkr.mtx.Lock()
138                 defer wkr.mtx.Unlock()
139                 now := time.Now()
140                 wkr.updated = now
141                 wkr.busy = now
142                 delete(wkr.starting, ctr.UUID)
143                 wkr.running[ctr.UUID] = rr
144                 wkr.lastUUID = ctr.UUID
145         }()
146 }
147
148 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
149 // for the worker's curent state. If a previous probe is still
150 // running, it does nothing.
151 //
152 // It should be called in a new goroutine.
153 func (wkr *worker) ProbeAndUpdate() {
154         select {
155         case wkr.probing <- struct{}{}:
156                 wkr.probeAndUpdate()
157                 <-wkr.probing
158         default:
159                 wkr.logger.Debug("still waiting for last probe to finish")
160         }
161 }
162
163 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
164 // updates state accordingly.
165 //
166 // In StateUnknown: Call both probeBooted and probeRunning.
167 // In StateBooting: Call probeBooted; if successful, call probeRunning.
168 // In StateRunning: Call probeRunning.
169 // In StateIdle: Call probeRunning.
170 // In StateShutdown: Do nothing.
171 //
172 // If both probes succeed, wkr.state changes to
173 // StateIdle/StateRunning.
174 //
175 // If probeRunning succeeds, wkr.running is updated. (This means
176 // wkr.running might be non-empty even in StateUnknown, if the boot
177 // probe failed.)
178 //
179 // probeAndUpdate should be called in a new goroutine.
180 func (wkr *worker) probeAndUpdate() {
181         wkr.mtx.Lock()
182         updated := wkr.updated
183         initialState := wkr.state
184         wkr.mtx.Unlock()
185
186         var (
187                 booted   bool
188                 ctrUUIDs []string
189                 ok       bool
190                 stderr   []byte // from probeBooted
191         )
192
193         switch initialState {
194         case StateShutdown:
195                 return
196         case StateIdle, StateRunning:
197                 booted = true
198         case StateUnknown, StateBooting:
199         default:
200                 panic(fmt.Sprintf("unknown state %s", initialState))
201         }
202
203         probeStart := time.Now()
204         logger := wkr.logger.WithField("ProbeStart", probeStart)
205
206         if !booted {
207                 booted, stderr = wkr.probeBooted()
208                 if !booted {
209                         // Pretend this probe succeeded if another
210                         // concurrent attempt succeeded.
211                         wkr.mtx.Lock()
212                         booted = wkr.state == StateRunning || wkr.state == StateIdle
213                         wkr.mtx.Unlock()
214                 }
215                 if booted {
216                         logger.Info("instance booted; will try probeRunning")
217                 }
218         }
219         reportedBroken := false
220         if booted || wkr.state == StateUnknown {
221                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
222         }
223         wkr.mtx.Lock()
224         defer wkr.mtx.Unlock()
225         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
226                 logger.Info("probe reported broken instance")
227     // FIXME add prometheus tick
228                 wkr.setIdleBehavior(IdleBehaviorDrain)
229         }
230         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
231                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
232                         // Skip the logging noise if shutdown was
233                         // initiated during probe.
234                         return
235                 }
236                 // Using the start time of the probe as the timeout
237                 // threshold ensures we always initiate at least one
238                 // probe attempt after the boot/probe timeout expires
239                 // (otherwise, a slow probe failure could cause us to
240                 // shutdown an instance even though it did in fact
241                 // boot/recover before the timeout expired).
242                 dur := probeStart.Sub(wkr.probed)
243                 if wkr.shutdownIfBroken(dur) {
244                         // stderr from failed run-probes will have
245                         // been logged already, but boot-probe
246                         // failures are normal so they are logged only
247                         // at Debug level. This is our chance to log
248                         // some evidence about why the node never
249                         // booted, even in non-debug mode.
250                         if !booted {
251                                 logger.WithFields(logrus.Fields{
252                                         "Duration": dur,
253                                         "stderr":   string(stderr),
254                                 }).Info("boot failed")
255                         }
256                 }
257                 return
258         }
259
260         updateTime := time.Now()
261         wkr.probed = updateTime
262
263         if updated != wkr.updated {
264                 // Worker was updated after the probe began, so
265                 // wkr.running might have a container UUID that was
266                 // not yet running when ctrUUIDs was generated. Leave
267                 // wkr.running alone and wait for the next probe to
268                 // catch up on any changes.
269                 return
270         }
271
272         if len(ctrUUIDs) > 0 {
273                 wkr.busy = updateTime
274                 wkr.lastUUID = ctrUUIDs[0]
275         } else if len(wkr.running) > 0 {
276                 // Actual last-busy time was sometime between wkr.busy
277                 // and now. Now is the earliest opportunity to take
278                 // advantage of the non-busy state, though.
279                 wkr.busy = updateTime
280         }
281
282         changed := wkr.updateRunning(ctrUUIDs)
283
284         // Update state if this was the first successful boot-probe.
285         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
286                 // Note: this will change again below if
287                 // len(wkr.starting)+len(wkr.running) > 0.
288                 wkr.state = StateIdle
289                 changed = true
290         }
291
292         // If wkr.state and wkr.running aren't changing then there's
293         // no need to log anything, notify the scheduler, move state
294         // back and forth between idle/running, etc.
295         if !changed {
296                 return
297         }
298
299         // Log whenever a run-probe reveals crunch-run processes
300         // appearing/disappearing before boot-probe succeeds.
301         if wkr.state == StateUnknown && changed {
302                 logger.WithFields(logrus.Fields{
303                         "RunningContainers": len(wkr.running),
304                         "State":             wkr.state,
305                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
306         }
307
308         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
309                 wkr.state = StateRunning
310         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
311                 wkr.state = StateIdle
312         }
313         wkr.updated = updateTime
314         if booted && (initialState == StateUnknown || initialState == StateBooting) {
315                 logger.WithFields(logrus.Fields{
316                         "RunningContainers": len(wkr.running),
317                         "State":             wkr.state,
318                 }).Info("probes succeeded, instance is in service")
319         }
320         go wkr.wp.notify()
321 }
322
323 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
324         cmd := wkr.wp.runnerCmd + " --list"
325         if u := wkr.instance.RemoteUser(); u != "root" {
326                 cmd = "sudo " + cmd
327         }
328         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
329         if err != nil {
330                 wkr.logger.WithFields(logrus.Fields{
331                         "Command": cmd,
332                         "stdout":  string(stdout),
333                         "stderr":  string(stderr),
334                 }).WithError(err).Warn("probe failed")
335                 return
336         }
337         ok = true
338         for _, s := range strings.Split(string(stdout), "\n") {
339                 if s == "broken" {
340                         reportsBroken = true
341                 } else if s != "" {
342                         running = append(running, s)
343                 }
344         }
345         return
346 }
347
348 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
349         cmd := wkr.wp.bootProbeCommand
350         if cmd == "" {
351                 cmd = "true"
352         }
353         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
354         logger := wkr.logger.WithFields(logrus.Fields{
355                 "Command": cmd,
356                 "stdout":  string(stdout),
357                 "stderr":  string(stderr),
358         })
359         if err != nil {
360                 logger.WithError(err).Debug("boot probe failed")
361                 return false, stderr
362         }
363         logger.Info("boot probe succeeded")
364         if err = wkr.wp.loadRunnerData(); err != nil {
365                 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
366                 return false, stderr
367         } else if len(wkr.wp.runnerData) == 0 {
368                 // Assume crunch-run is already installed
369         } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
370                 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
371                 return false, stderr2
372         } else {
373                 stderr = append(stderr, stderr2...)
374         }
375         return true, stderr
376 }
377
378 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
379         hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
380         dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
381         logger := wkr.logger.WithFields(logrus.Fields{
382                 "hash": hash,
383                 "path": wkr.wp.runnerCmd,
384         })
385
386         stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
387         if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
388                 logger.Info("runner binary already exists on worker, with correct hash")
389                 return
390         }
391
392         // Note touch+chmod come before writing data, to avoid the
393         // possibility of md5 being correct while file mode is
394         // incorrect.
395         cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
396         if wkr.instance.RemoteUser() != "root" {
397                 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
398         }
399         logger.WithField("cmd", cmd).Info("installing runner binary on worker")
400         stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
401         return
402 }
403
404 // caller must have lock.
405 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
406         if wkr.idleBehavior == IdleBehaviorHold {
407                 // Never shut down.
408                 return false
409         }
410         label, threshold := "", wkr.wp.timeoutProbe
411         if wkr.state == StateUnknown || wkr.state == StateBooting {
412                 label, threshold = "new ", wkr.wp.timeoutBooting
413         }
414         if dur < threshold {
415                 return false
416         }
417         wkr.logger.WithFields(logrus.Fields{
418                 "Duration": dur,
419                 "Since":    wkr.probed,
420                 "State":    wkr.state,
421         }).Warnf("%sinstance unresponsive, shutting down", label)
422         wkr.shutdown()
423         return true
424 }
425
426 // Returns true if the instance is eligible for shutdown: either it's
427 // been idle too long, or idleBehavior=Drain and nothing is running.
428 //
429 // caller must have lock.
430 func (wkr *worker) eligibleForShutdown() bool {
431         if wkr.idleBehavior == IdleBehaviorHold {
432                 return false
433         }
434         draining := wkr.idleBehavior == IdleBehaviorDrain
435         switch wkr.state {
436         case StateBooting:
437                 return draining
438         case StateIdle:
439                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
440         case StateRunning:
441                 if !draining {
442                         return false
443                 }
444                 for _, rr := range wkr.running {
445                         if !rr.givenup {
446                                 return false
447                         }
448                 }
449                 for _, rr := range wkr.starting {
450                         if !rr.givenup {
451                                 return false
452                         }
453                 }
454                 // draining, and all remaining runners are just trying
455                 // to force-kill their crunch-run procs
456                 return true
457         default:
458                 return false
459         }
460 }
461
462 // caller must have lock.
463 func (wkr *worker) shutdownIfIdle() bool {
464         if !wkr.eligibleForShutdown() {
465                 return false
466         }
467         wkr.logger.WithFields(logrus.Fields{
468                 "State":        wkr.state,
469                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
470                 "IdleBehavior": wkr.idleBehavior,
471         }).Info("shutdown worker")
472         wkr.shutdown()
473         return true
474 }
475
476 // caller must have lock.
477 func (wkr *worker) shutdown() {
478         now := time.Now()
479         wkr.updated = now
480         wkr.destroyed = now
481         wkr.state = StateShutdown
482         go wkr.wp.notify()
483         go func() {
484                 err := wkr.instance.Destroy()
485                 if err != nil {
486                         wkr.logger.WithError(err).Warn("shutdown failed")
487                         return
488                 }
489         }()
490 }
491
492 // Save worker tags to cloud provider metadata, if they don't already
493 // match. Caller must have lock.
494 func (wkr *worker) saveTags() {
495         instance := wkr.instance
496         tags := instance.Tags()
497         update := cloud.InstanceTags{
498                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
499                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
500         }
501         save := false
502         for k, v := range update {
503                 if tags[k] != v {
504                         tags[k] = v
505                         save = true
506                 }
507         }
508         if save {
509                 go func() {
510                         err := instance.SetTags(tags)
511                         if err != nil {
512                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
513                         }
514                 }()
515         }
516 }
517
518 func (wkr *worker) Close() {
519         // This might take time, so do it after unlocking mtx.
520         defer wkr.executor.Close()
521
522         wkr.mtx.Lock()
523         defer wkr.mtx.Unlock()
524         for uuid, rr := range wkr.running {
525                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
526                 rr.Close()
527         }
528         for uuid, rr := range wkr.starting {
529                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
530                 rr.Close()
531         }
532 }
533
534 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
535 // probe. Returns true if anything was added or removed.
536 //
537 // Caller must have lock.
538 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
539         alive := map[string]bool{}
540         for _, uuid := range ctrUUIDs {
541                 alive[uuid] = true
542                 if _, ok := wkr.running[uuid]; ok {
543                         // unchanged
544                 } else if rr, ok := wkr.starting[uuid]; ok {
545                         wkr.running[uuid] = rr
546                         delete(wkr.starting, uuid)
547                         changed = true
548                 } else {
549                         // We didn't start it -- it must have been
550                         // started by a previous dispatcher process.
551                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
552                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
553                         changed = true
554                 }
555         }
556         for uuid := range wkr.running {
557                 if !alive[uuid] {
558                         wkr.closeRunner(uuid)
559                         changed = true
560                 }
561         }
562         return
563 }
564
565 // caller must have lock.
566 func (wkr *worker) closeRunner(uuid string) {
567         rr := wkr.running[uuid]
568         if rr == nil {
569                 return
570         }
571         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
572         delete(wkr.running, uuid)
573         rr.Close()
574
575         now := time.Now()
576         wkr.updated = now
577         wkr.wp.exited[uuid] = now
578         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
579                 wkr.state = StateIdle
580         }
581 }