Merge branch '15295-keep-ref-check' closes #15295
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "fmt"
9         "strings"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/lib/cloud"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "git.curoverse.com/arvados.git/sdk/go/stats"
16         "github.com/sirupsen/logrus"
17 )
18
19 const (
20         // TODO: configurable
21         maxPingFailTime = 10 * time.Minute
22 )
23
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
26 type State int
27
28 const (
29         StateUnknown  State = iota // might be running a container already
30         StateBooting               // instance is booting
31         StateIdle                  // instance booted, no containers are running
32         StateRunning               // instance is running one or more containers
33         StateShutdown              // worker has stopped monitoring the instance
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42 }
43
44 // String implements fmt.Stringer.
45 func (s State) String() string {
46         return stateString[s]
47 }
48
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52         return []byte(stateString[s]), nil
53 }
54
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
57
58 const (
59         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
60         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
61         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
62 )
63
64 var validIdleBehavior = map[IdleBehavior]bool{
65         IdleBehaviorRun:   true,
66         IdleBehaviorHold:  true,
67         IdleBehaviorDrain: true,
68 }
69
70 type worker struct {
71         logger   logrus.FieldLogger
72         executor Executor
73         wp       *Pool
74
75         mtx          sync.Locker // must be wp's Locker.
76         state        State
77         idleBehavior IdleBehavior
78         instance     cloud.Instance
79         instType     arvados.InstanceType
80         vcpus        int64
81         memory       int64
82         appeared     time.Time
83         probed       time.Time
84         updated      time.Time
85         busy         time.Time
86         destroyed    time.Time
87         lastUUID     string
88         running      map[string]*remoteRunner // remember to update state idle<->running when this changes
89         starting     map[string]*remoteRunner // remember to update state idle<->running when this changes
90         probing      chan struct{}
91 }
92
93 func (wkr *worker) onUnkillable(uuid string) {
94         wkr.mtx.Lock()
95         defer wkr.mtx.Unlock()
96         logger := wkr.logger.WithField("ContainerUUID", uuid)
97         if wkr.idleBehavior == IdleBehaviorHold {
98                 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
99                 return
100         }
101         logger.Warn("unkillable container, draining worker")
102         wkr.setIdleBehavior(IdleBehaviorDrain)
103 }
104
105 func (wkr *worker) onKilled(uuid string) {
106         wkr.mtx.Lock()
107         defer wkr.mtx.Unlock()
108         wkr.closeRunner(uuid)
109         go wkr.wp.notify()
110 }
111
112 // caller must have lock.
113 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
114         wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
115         wkr.idleBehavior = idleBehavior
116         wkr.saveTags()
117         wkr.shutdownIfIdle()
118 }
119
120 // caller must have lock.
121 func (wkr *worker) startContainer(ctr arvados.Container) {
122         logger := wkr.logger.WithFields(logrus.Fields{
123                 "ContainerUUID": ctr.UUID,
124                 "Priority":      ctr.Priority,
125         })
126         logger.Debug("starting container")
127         rr := newRemoteRunner(ctr.UUID, wkr)
128         wkr.starting[ctr.UUID] = rr
129         if wkr.state != StateRunning {
130                 wkr.state = StateRunning
131                 go wkr.wp.notify()
132         }
133         go func() {
134                 rr.Start()
135                 wkr.mtx.Lock()
136                 defer wkr.mtx.Unlock()
137                 now := time.Now()
138                 wkr.updated = now
139                 wkr.busy = now
140                 delete(wkr.starting, ctr.UUID)
141                 wkr.running[ctr.UUID] = rr
142                 wkr.lastUUID = ctr.UUID
143         }()
144 }
145
146 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
147 // for the worker's curent state. If a previous probe is still
148 // running, it does nothing.
149 //
150 // It should be called in a new goroutine.
151 func (wkr *worker) ProbeAndUpdate() {
152         select {
153         case wkr.probing <- struct{}{}:
154                 wkr.probeAndUpdate()
155                 <-wkr.probing
156         default:
157                 wkr.logger.Debug("still waiting for last probe to finish")
158         }
159 }
160
161 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
162 // updates state accordingly.
163 //
164 // In StateUnknown: Call both probeBooted and probeRunning.
165 // In StateBooting: Call probeBooted; if successful, call probeRunning.
166 // In StateRunning: Call probeRunning.
167 // In StateIdle: Call probeRunning.
168 // In StateShutdown: Do nothing.
169 //
170 // If both probes succeed, wkr.state changes to
171 // StateIdle/StateRunning.
172 //
173 // If probeRunning succeeds, wkr.running is updated. (This means
174 // wkr.running might be non-empty even in StateUnknown, if the boot
175 // probe failed.)
176 //
177 // probeAndUpdate should be called in a new goroutine.
178 func (wkr *worker) probeAndUpdate() {
179         wkr.mtx.Lock()
180         updated := wkr.updated
181         initialState := wkr.state
182         wkr.mtx.Unlock()
183
184         var (
185                 booted   bool
186                 ctrUUIDs []string
187                 ok       bool
188                 stderr   []byte // from probeBooted
189         )
190
191         switch initialState {
192         case StateShutdown:
193                 return
194         case StateIdle, StateRunning:
195                 booted = true
196         case StateUnknown, StateBooting:
197         default:
198                 panic(fmt.Sprintf("unknown state %s", initialState))
199         }
200
201         probeStart := time.Now()
202         logger := wkr.logger.WithField("ProbeStart", probeStart)
203
204         if !booted {
205                 booted, stderr = wkr.probeBooted()
206                 if !booted {
207                         // Pretend this probe succeeded if another
208                         // concurrent attempt succeeded.
209                         wkr.mtx.Lock()
210                         booted = wkr.state == StateRunning || wkr.state == StateIdle
211                         wkr.mtx.Unlock()
212                 }
213                 if booted {
214                         logger.Info("instance booted; will try probeRunning")
215                 }
216         }
217         reportedBroken := false
218         if booted || wkr.state == StateUnknown {
219                 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
220         }
221         wkr.mtx.Lock()
222         defer wkr.mtx.Unlock()
223         if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
224                 logger.Info("probe reported broken instance")
225                 wkr.setIdleBehavior(IdleBehaviorDrain)
226         }
227         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
228                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
229                         // Skip the logging noise if shutdown was
230                         // initiated during probe.
231                         return
232                 }
233                 // Using the start time of the probe as the timeout
234                 // threshold ensures we always initiate at least one
235                 // probe attempt after the boot/probe timeout expires
236                 // (otherwise, a slow probe failure could cause us to
237                 // shutdown an instance even though it did in fact
238                 // boot/recover before the timeout expired).
239                 dur := probeStart.Sub(wkr.probed)
240                 if wkr.shutdownIfBroken(dur) {
241                         // stderr from failed run-probes will have
242                         // been logged already, but boot-probe
243                         // failures are normal so they are logged only
244                         // at Debug level. This is our chance to log
245                         // some evidence about why the node never
246                         // booted, even in non-debug mode.
247                         if !booted {
248                                 logger.WithFields(logrus.Fields{
249                                         "Duration": dur,
250                                         "stderr":   string(stderr),
251                                 }).Info("boot failed")
252                         }
253                 }
254                 return
255         }
256
257         updateTime := time.Now()
258         wkr.probed = updateTime
259
260         if updated != wkr.updated {
261                 // Worker was updated after the probe began, so
262                 // wkr.running might have a container UUID that was
263                 // not yet running when ctrUUIDs was generated. Leave
264                 // wkr.running alone and wait for the next probe to
265                 // catch up on any changes.
266                 return
267         }
268
269         if len(ctrUUIDs) > 0 {
270                 wkr.busy = updateTime
271                 wkr.lastUUID = ctrUUIDs[0]
272         } else if len(wkr.running) > 0 {
273                 // Actual last-busy time was sometime between wkr.busy
274                 // and now. Now is the earliest opportunity to take
275                 // advantage of the non-busy state, though.
276                 wkr.busy = updateTime
277         }
278
279         changed := wkr.updateRunning(ctrUUIDs)
280
281         // Update state if this was the first successful boot-probe.
282         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
283                 // Note: this will change again below if
284                 // len(wkr.starting)+len(wkr.running) > 0.
285                 wkr.state = StateIdle
286                 changed = true
287         }
288
289         // If wkr.state and wkr.running aren't changing then there's
290         // no need to log anything, notify the scheduler, move state
291         // back and forth between idle/running, etc.
292         if !changed {
293                 return
294         }
295
296         // Log whenever a run-probe reveals crunch-run processes
297         // appearing/disappearing before boot-probe succeeds.
298         if wkr.state == StateUnknown && changed {
299                 logger.WithFields(logrus.Fields{
300                         "RunningContainers": len(wkr.running),
301                         "State":             wkr.state,
302                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
303         }
304
305         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
306                 wkr.state = StateRunning
307         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
308                 wkr.state = StateIdle
309         }
310         wkr.updated = updateTime
311         if booted && (initialState == StateUnknown || initialState == StateBooting) {
312                 logger.WithFields(logrus.Fields{
313                         "RunningContainers": len(wkr.running),
314                         "State":             wkr.state,
315                 }).Info("probes succeeded, instance is in service")
316         }
317         go wkr.wp.notify()
318 }
319
320 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
321         cmd := "crunch-run --list"
322         if u := wkr.instance.RemoteUser(); u != "root" {
323                 cmd = "sudo " + cmd
324         }
325         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
326         if err != nil {
327                 wkr.logger.WithFields(logrus.Fields{
328                         "Command": cmd,
329                         "stdout":  string(stdout),
330                         "stderr":  string(stderr),
331                 }).WithError(err).Warn("probe failed")
332                 return
333         }
334         ok = true
335         for _, s := range strings.Split(string(stdout), "\n") {
336                 if s == "broken" {
337                         reportsBroken = true
338                 } else if s != "" {
339                         running = append(running, s)
340                 }
341         }
342         return
343 }
344
345 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
346         cmd := wkr.wp.bootProbeCommand
347         if cmd == "" {
348                 cmd = "true"
349         }
350         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
351         logger := wkr.logger.WithFields(logrus.Fields{
352                 "Command": cmd,
353                 "stdout":  string(stdout),
354                 "stderr":  string(stderr),
355         })
356         if err != nil {
357                 logger.WithError(err).Debug("boot probe failed")
358                 return false, stderr
359         }
360         logger.Info("boot probe succeeded")
361         return true, stderr
362 }
363
364 // caller must have lock.
365 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
366         if wkr.idleBehavior == IdleBehaviorHold {
367                 // Never shut down.
368                 return false
369         }
370         label, threshold := "", wkr.wp.timeoutProbe
371         if wkr.state == StateUnknown || wkr.state == StateBooting {
372                 label, threshold = "new ", wkr.wp.timeoutBooting
373         }
374         if dur < threshold {
375                 return false
376         }
377         wkr.logger.WithFields(logrus.Fields{
378                 "Duration": dur,
379                 "Since":    wkr.probed,
380                 "State":    wkr.state,
381         }).Warnf("%sinstance unresponsive, shutting down", label)
382         wkr.shutdown()
383         return true
384 }
385
386 // Returns true if the instance is eligible for shutdown: either it's
387 // been idle too long, or idleBehavior=Drain and nothing is running.
388 //
389 // caller must have lock.
390 func (wkr *worker) eligibleForShutdown() bool {
391         if wkr.idleBehavior == IdleBehaviorHold {
392                 return false
393         }
394         draining := wkr.idleBehavior == IdleBehaviorDrain
395         switch wkr.state {
396         case StateBooting:
397                 return draining
398         case StateIdle:
399                 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
400         case StateRunning:
401                 if !draining {
402                         return false
403                 }
404                 for _, rr := range wkr.running {
405                         if !rr.givenup {
406                                 return false
407                         }
408                 }
409                 for _, rr := range wkr.starting {
410                         if !rr.givenup {
411                                 return false
412                         }
413                 }
414                 // draining, and all remaining runners are just trying
415                 // to force-kill their crunch-run procs
416                 return true
417         default:
418                 return false
419         }
420 }
421
422 // caller must have lock.
423 func (wkr *worker) shutdownIfIdle() bool {
424         if !wkr.eligibleForShutdown() {
425                 return false
426         }
427         wkr.logger.WithFields(logrus.Fields{
428                 "State":        wkr.state,
429                 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
430                 "IdleBehavior": wkr.idleBehavior,
431         }).Info("shutdown worker")
432         wkr.shutdown()
433         return true
434 }
435
436 // caller must have lock.
437 func (wkr *worker) shutdown() {
438         now := time.Now()
439         wkr.updated = now
440         wkr.destroyed = now
441         wkr.state = StateShutdown
442         go wkr.wp.notify()
443         go func() {
444                 err := wkr.instance.Destroy()
445                 if err != nil {
446                         wkr.logger.WithError(err).Warn("shutdown failed")
447                         return
448                 }
449         }()
450 }
451
452 // Save worker tags to cloud provider metadata, if they don't already
453 // match. Caller must have lock.
454 func (wkr *worker) saveTags() {
455         instance := wkr.instance
456         tags := instance.Tags()
457         update := cloud.InstanceTags{
458                 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
459                 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
460         }
461         save := false
462         for k, v := range update {
463                 if tags[k] != v {
464                         tags[k] = v
465                         save = true
466                 }
467         }
468         if save {
469                 go func() {
470                         err := instance.SetTags(tags)
471                         if err != nil {
472                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
473                         }
474                 }()
475         }
476 }
477
478 func (wkr *worker) Close() {
479         // This might take time, so do it after unlocking mtx.
480         defer wkr.executor.Close()
481
482         wkr.mtx.Lock()
483         defer wkr.mtx.Unlock()
484         for uuid, rr := range wkr.running {
485                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
486                 rr.Close()
487         }
488         for uuid, rr := range wkr.starting {
489                 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
490                 rr.Close()
491         }
492 }
493
494 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
495 // probe. Returns true if anything was added or removed.
496 //
497 // Caller must have lock.
498 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
499         alive := map[string]bool{}
500         for _, uuid := range ctrUUIDs {
501                 alive[uuid] = true
502                 if _, ok := wkr.running[uuid]; ok {
503                         // unchanged
504                 } else if rr, ok := wkr.starting[uuid]; ok {
505                         wkr.running[uuid] = rr
506                         delete(wkr.starting, uuid)
507                         changed = true
508                 } else {
509                         // We didn't start it -- it must have been
510                         // started by a previous dispatcher process.
511                         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
512                         wkr.running[uuid] = newRemoteRunner(uuid, wkr)
513                         changed = true
514                 }
515         }
516         for uuid := range wkr.running {
517                 if !alive[uuid] {
518                         wkr.closeRunner(uuid)
519                         changed = true
520                 }
521         }
522         return
523 }
524
525 // caller must have lock.
526 func (wkr *worker) closeRunner(uuid string) {
527         rr := wkr.running[uuid]
528         if rr == nil {
529                 return
530         }
531         wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
532         delete(wkr.running, uuid)
533         rr.Close()
534
535         now := time.Now()
536         wkr.updated = now
537         wkr.wp.exited[uuid] = now
538         if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
539                 wkr.state = StateIdle
540         }
541 }