Merge branch '14807-escalate-sigterm'
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "git.curoverse.com/arvados.git/sdk/go/stats"
17         "github.com/sirupsen/logrus"
18 )
19
20 const (
21         // TODO: configurable
22         maxPingFailTime = 10 * time.Minute
23 )
24
25 // State indicates whether a worker is available to do work, and (if
26 // not) whether/when it is expected to become ready.
27 type State int
28
29 const (
30         StateUnknown  State = iota // might be running a container already
31         StateBooting               // instance is booting
32         StateIdle                  // instance booted, no containers are running
33         StateRunning               // instance is running one or more containers
34         StateShutdown              // worker has stopped monitoring the instance
35 )
36
37 var stateString = map[State]string{
38         StateUnknown:  "unknown",
39         StateBooting:  "booting",
40         StateIdle:     "idle",
41         StateRunning:  "running",
42         StateShutdown: "shutdown",
43 }
44
45 // String implements fmt.Stringer.
46 func (s State) String() string {
47         return stateString[s]
48 }
49
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53         return []byte(stateString[s]), nil
54 }
55
56 // IdleBehavior indicates the behavior desired when a node becomes idle.
57 type IdleBehavior string
58
59 const (
60         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
61         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
62         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
63 )
64
65 var validIdleBehavior = map[IdleBehavior]bool{
66         IdleBehaviorRun:   true,
67         IdleBehaviorHold:  true,
68         IdleBehaviorDrain: true,
69 }
70
71 type worker struct {
72         logger   logrus.FieldLogger
73         executor Executor
74         wp       *Pool
75
76         mtx          sync.Locker // must be wp's Locker.
77         state        State
78         idleBehavior IdleBehavior
79         instance     cloud.Instance
80         instType     arvados.InstanceType
81         vcpus        int64
82         memory       int64
83         appeared     time.Time
84         probed       time.Time
85         updated      time.Time
86         busy         time.Time
87         destroyed    time.Time
88         lastUUID     string
89         running      map[string]*remoteRunner // remember to update state idle<->running when this changes
90         starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
91         probing      chan struct{}
92 }
93
94 func (wkr *worker) onUnkillable(uuid string) {
95         wkr.mtx.Lock()
96         defer wkr.mtx.Unlock()
97         logger := wkr.logger.WithField("ContainerUUID", uuid)
98         if wkr.idleBehavior == IdleBehaviorHold {
99                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
100                 return
101         }
102         logger.Warn("unkillable container, draining worker")
103         wkr.setIdleBehavior(IdleBehaviorDrain)
104 }
105
106 func (wkr *worker) onKilled(uuid string) {
107         wkr.mtx.Lock()
108         defer wkr.mtx.Unlock()
109         wkr.closeRunner(uuid)
110         go wkr.wp.notify()
111 }
112
113 // caller must have lock.
114 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
115         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
116         wkr.idleBehavior = idleBehavior
117         wkr.saveTags()
118         wkr.shutdownIfIdle()
119 }
120
121 // caller must have lock.
122 func (wkr *worker) startContainer(ctr arvados.Container) {
123         logger := wkr.logger.WithFields(logrus.Fields{
124                 "ContainerUUID": ctr.UUID,
125                 "Priority":      ctr.Priority,
126         })
127         logger.Debug("starting container")
128         rr := newRemoteRunner(ctr.UUID, wkr)
129         wkr.starting[ctr.UUID] = rr
130         if wkr.state != StateRunning {
131                 wkr.state = StateRunning
132                 go wkr.wp.notify()
133         }
134         go func() {
135                 rr.Start()
136                 wkr.mtx.Lock()
137                 defer wkr.mtx.Unlock()
138                 now := time.Now()
139                 wkr.updated = now
140                 wkr.busy = now
141                 delete(wkr.starting, ctr.UUID)
142                 wkr.running[ctr.UUID] = rr
143                 wkr.lastUUID = ctr.UUID
144         }()
145 }
146
147 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
148 // for the worker's curent state. If a previous probe is still
149 // running, it does nothing.
150 //
151 // It should be called in a new goroutine.
152 func (wkr *worker) ProbeAndUpdate() {
153         select {
154         case wkr.probing <- struct{}{}:
155                 wkr.probeAndUpdate()
156                 <-wkr.probing
157         default:
158                 wkr.logger.Debug("still waiting for last probe to finish")
159         }
160 }
161
162 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
163 // updates state accordingly.
164 //
165 // In StateUnknown: Call both probeBooted and probeRunning.
166 // In StateBooting: Call probeBooted; if successful, call probeRunning.
167 // In StateRunning: Call probeRunning.
168 // In StateIdle: Call probeRunning.
169 // In StateShutdown: Do nothing.
170 //
171 // If both probes succeed, wkr.state changes to
172 // StateIdle/StateRunning.
173 //
174 // If probeRunning succeeds, wkr.running is updated. (This means
175 // wkr.running might be non-empty even in StateUnknown, if the boot
176 // probe failed.)
177 //
178 // probeAndUpdate should be called in a new goroutine.
179 func (wkr *worker) probeAndUpdate() {
180         wkr.mtx.Lock()
181         updated := wkr.updated
182         initialState := wkr.state
183         wkr.mtx.Unlock()
184
185         var (
186                 booted   bool
187                 ctrUUIDs []string
188                 ok       bool
189                 stderr   []byte // from probeBooted
190         )
191
192         switch initialState {
193         case StateShutdown:
194                 return
195         case StateIdle, StateRunning:
196                 booted = true
197         case StateUnknown, StateBooting:
198         default:
199                 panic(fmt.Sprintf("unknown state %s", initialState))
200         }
201
202         probeStart := time.Now()
203         logger := wkr.logger.WithField("ProbeStart", probeStart)
204
205         if !booted {
206                 booted, stderr = wkr.probeBooted()
207                 if !booted {
208                         // Pretend this probe succeeded if another
209                         // concurrent attempt succeeded.
210                         wkr.mtx.Lock()
211                         booted = wkr.state == StateRunning || wkr.state == StateIdle
212                         wkr.mtx.Unlock()
213                 }
214                 if booted {
215                         logger.Info("instance booted; will try probeRunning")
216                 }
217         }
218         if booted || wkr.state == StateUnknown {
219                 ctrUUIDs, ok = wkr.probeRunning()
220         }
221         wkr.mtx.Lock()
222         defer wkr.mtx.Unlock()
223         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
224                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
225                         // Skip the logging noise if shutdown was
226                         // initiated during probe.
227                         return
228                 }
229                 // Using the start time of the probe as the timeout
230                 // threshold ensures we always initiate at least one
231                 // probe attempt after the boot/probe timeout expires
232                 // (otherwise, a slow probe failure could cause us to
233                 // shutdown an instance even though it did in fact
234                 // boot/recover before the timeout expired).
235                 dur := probeStart.Sub(wkr.probed)
236                 if wkr.shutdownIfBroken(dur) {
237                         // stderr from failed run-probes will have
238                         // been logged already, but boot-probe
239                         // failures are normal so they are logged only
240                         // at Debug level. This is our chance to log
241                         // some evidence about why the node never
242                         // booted, even in non-debug mode.
243                         if !booted {
244                                 logger.WithFields(logrus.Fields{
245                                         "Duration": dur,
246                                         "stderr":   string(stderr),
247                                 }).Info("boot failed")
248                         }
249                 }
250                 return
251         }
252
253         updateTime := time.Now()
254         wkr.probed = updateTime
255
256         if updated != wkr.updated {
257                 // Worker was updated after the probe began, so
258                 // wkr.running might have a container UUID that was
259                 // not yet running when ctrUUIDs was generated. Leave
260                 // wkr.running alone and wait for the next probe to
261                 // catch up on any changes.
262                 return
263         }
264
265         if len(ctrUUIDs) > 0 {
266                 wkr.busy = updateTime
267                 wkr.lastUUID = ctrUUIDs[0]
268         } else if len(wkr.running) > 0 {
269                 // Actual last-busy time was sometime between wkr.busy
270                 // and now. Now is the earliest opportunity to take
271                 // advantage of the non-busy state, though.
272                 wkr.busy = updateTime
273         }
274
275         changed := wkr.updateRunning(ctrUUIDs)
276
277         // Update state if this was the first successful boot-probe.
278         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
279                 // Note: this will change again below if
280                 // len(wkr.starting)+len(wkr.running) > 0.
281                 wkr.state = StateIdle
282                 changed = true
283         }
284
285         // If wkr.state and wkr.running aren't changing then there's
286         // no need to log anything, notify the scheduler, move state
287         // back and forth between idle/running, etc.
288         if !changed {
289                 return
290         }
291
292         // Log whenever a run-probe reveals crunch-run processes
293         // appearing/disappearing before boot-probe succeeds.
294         if wkr.state == StateUnknown && changed {
295                 logger.WithFields(logrus.Fields{
296                         "RunningContainers": len(wkr.running),
297                         "State":             wkr.state,
298                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
299         }
300
301         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
302                 wkr.state = StateRunning
303         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
304                 wkr.state = StateIdle
305         }
306         wkr.updated = updateTime
307         if booted && (initialState == StateUnknown || initialState == StateBooting) {
308                 logger.WithFields(logrus.Fields{
309                         "RunningContainers": len(wkr.running),
310                         "State":             wkr.state,
311                 }).Info("probes succeeded, instance is in service")
312         }
313         go wkr.wp.notify()
314 }
315
316 func (wkr *worker) probeRunning() (running []string, ok bool) {
317         cmd := "crunch-run --list"
318         if u := wkr.instance.RemoteUser(); u != "root" {
319                 cmd = "sudo " + cmd
320         }
321         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
322         if err != nil {
323                 wkr.logger.WithFields(logrus.Fields{
324                         "Command": cmd,
325                         "stdout":  string(stdout),
326                         "stderr":  string(stderr),
327                 }).WithError(err).Warn("probe failed")
328                 return nil, false
329         }
330         stdout = bytes.TrimRight(stdout, "\n")
331         if len(stdout) == 0 {
332                 return nil, true
333         }
334         return strings.Split(string(stdout), "\n"), true
335 }
336
337 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
338         cmd := wkr.wp.bootProbeCommand
339         if cmd == "" {
340                 cmd = "true"
341         }
342         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
343         logger := wkr.logger.WithFields(logrus.Fields{
344                 "Command": cmd,
345                 "stdout":  string(stdout),
346                 "stderr":  string(stderr),
347         })
348         if err != nil {
349                 logger.WithError(err).Debug("boot probe failed")
350                 return false, stderr
351         }
352         logger.Info("boot probe succeeded")
353         return true, stderr
354 }
355
356 // caller must have lock.
357 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
358         if wkr.idleBehavior == IdleBehaviorHold {
359                 // Never shut down.
360                 return false
361         }
362         label, threshold := "", wkr.wp.timeoutProbe
363         if wkr.state == StateUnknown || wkr.state == StateBooting {
364                 label, threshold = "new ", wkr.wp.timeoutBooting
365         }
366         if dur < threshold {
367                 return false
368         }
369         wkr.logger.WithFields(logrus.Fields{
370                 "Duration": dur,
371                 "Since":    wkr.probed,
372                 "State":    wkr.state,
373         }).Warnf("%sinstance unresponsive, shutting down", label)
374         wkr.shutdown()
375         return true
376 }
377
378 // Returns true if the instance is eligible for shutdown: either it's
379 // been idle too long, or idleBehavior=Drain and nothing is running.
380 //
381 // caller must have lock.
382 func (wkr *worker) eligibleForShutdown() bool {
383         if wkr.idleBehavior == IdleBehaviorHold {
384                 return false
385         }
386         draining := wkr.idleBehavior == IdleBehaviorDrain
387         switch wkr.state {
388         case StateBooting:
389                 return draining
390         case StateIdle:
391                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
392         case StateRunning:
393                 if !draining {
394                         return false
395                 }
396                 for _, rr := range wkr.running {
397                         if !rr.givenup {
398                                 return false
399                         }
400                 }
401                 for _, rr := range wkr.starting {
402                         if !rr.givenup {
403                                 return false
404                         }
405                 }
406                 // draining, and all remaining runners are just trying
407                 // to force-kill their crunch-run procs
408                 return true
409         default:
410                 return false
411         }
412 }
413
414 // caller must have lock.
415 func (wkr *worker) shutdownIfIdle() bool {
416         if !wkr.eligibleForShutdown() {
417                 return false
418         }
419         wkr.logger.WithFields(logrus.Fields{
420                 "State":        wkr.state,
421                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
422                 "IdleBehavior": wkr.idleBehavior,
423         }).Info("shutdown worker")
424         wkr.shutdown()
425         return true
426 }
427
428 // caller must have lock.
429 func (wkr *worker) shutdown() {
430         now := time.Now()
431         wkr.updated = now
432         wkr.destroyed = now
433         wkr.state = StateShutdown
434         go wkr.wp.notify()
435         go func() {
436                 err := wkr.instance.Destroy()
437                 if err != nil {
438                         wkr.logger.WithError(err).Warn("shutdown failed")
439                         return
440                 }
441         }()
442 }
443
444 // Save worker tags to cloud provider metadata, if they don't already
445 // match. Caller must have lock.
446 func (wkr *worker) saveTags() {
447         instance := wkr.instance
448         tags := instance.Tags()
449         update := cloud.InstanceTags{
450                 tagKeyInstanceType: wkr.instType.Name,
451                 tagKeyIdleBehavior: string(wkr.idleBehavior),
452         }
453         save := false
454         for k, v := range update {
455                 if tags[k] != v {
456                         tags[k] = v
457                         save = true
458                 }
459         }
460         if save {
461                 go func() {
462                         err := instance.SetTags(tags)
463                         if err != nil {
464                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
465                         }
466                 }()
467         }
468 }
469
470 func (wkr *worker) Close() {
471         // This might take time, so do it after unlocking mtx.
472         defer wkr.executor.Close()
473
474         wkr.mtx.Lock()
475         defer wkr.mtx.Unlock()
476         for uuid, rr := range wkr.running {
477                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
478                 rr.Close()
479         }
480         for uuid, rr := range wkr.starting {
481                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
482                 rr.Close()
483         }
484 }
485
486 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
487 // probe. Returns true if anything was added or removed.
488 //
489 // Caller must have lock.
490 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
491         alive := map[string]bool{}
492         for _, uuid := range ctrUUIDs {
493                 alive[uuid] = true
494                 if _, ok := wkr.running[uuid]; ok {
495                         // unchanged
496                 } else if rr, ok := wkr.starting[uuid]; ok {
497                         wkr.running[uuid] = rr
498                         delete(wkr.starting, uuid)
499                         changed = true
500                 } else {
501                         // We didn't start it -- it must have been
502                         // started by a previous dispatcher process.
503                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
504                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
505                         changed = true
506                 }
507         }
508         for uuid := range wkr.running {
509                 if !alive[uuid] {
510                         wkr.closeRunner(uuid)
511                         changed = true
512                 }
513         }
514         return
515 }
516
517 // caller must have lock.
518 func (wkr *worker) closeRunner(uuid string) {
519         rr := wkr.running[uuid]
520         if rr == nil {
521                 return
522         }
523         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
524         delete(wkr.running, uuid)
525         rr.Close()
526
527         now := time.Now()
528         wkr.updated = now
529         wkr.wp.exited[uuid] = now
530         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
531                 wkr.state = StateIdle
532         }
533 }