14325: Log stderr from last boot-probe when giving up on boot.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/sirupsen/logrus"
17 )
18
19 const (
20         // TODO: configurable
21         maxPingFailTime = 10 * time.Minute
22 )
23
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
26 type State int
27
28 const (
29         StateUnknown  State = iota // might be running a container already
30         StateBooting               // instance is booting
31         StateIdle                  // instance booted, no containers are running
32         StateRunning               // instance is running one or more containers
33         StateShutdown              // worker has stopped monitoring the instance
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42 }
43
44 // String implements fmt.Stringer.
45 func (s State) String() string {
46         return stateString[s]
47 }
48
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52         return []byte(stateString[s]), nil
53 }
54
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
57
58 const (
59         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
60         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
61         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
62 )
63
64 var validIdleBehavior = map[IdleBehavior]bool{
65         IdleBehaviorRun:   true,
66         IdleBehaviorHold:  true,
67         IdleBehaviorDrain: true,
68 }
69
70 type worker struct {
71         logger   logrus.FieldLogger
72         executor Executor
73         wp       *Pool
74
75         mtx          sync.Locker // must be wp's Locker.
76         state        State
77         idleBehavior IdleBehavior
78         instance     cloud.Instance
79         instType     arvados.InstanceType
80         vcpus        int64
81         memory       int64
82         appeared     time.Time
83         probed       time.Time
84         updated      time.Time
85         busy         time.Time
86         destroyed    time.Time
87         lastUUID     string
88         running      map[string]struct{} // remember to update state idle<->running when this changes
89         starting     map[string]struct{} // remember to update state idle<->running when this changes
90         probing      chan struct{}
91 }
92
93 // caller must have lock.
94 func (wkr *worker) startContainer(ctr arvados.Container) {
95         logger := wkr.logger.WithFields(logrus.Fields{
96                 "ContainerUUID": ctr.UUID,
97                 "Priority":      ctr.Priority,
98         })
99         logger = logger.WithField("Instance", wkr.instance)
100         logger.Debug("starting container")
101         wkr.starting[ctr.UUID] = struct{}{}
102         wkr.state = StateRunning
103         go func() {
104                 env := map[string]string{
105                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
106                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
107                 }
108                 stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
109                 wkr.mtx.Lock()
110                 defer wkr.mtx.Unlock()
111                 now := time.Now()
112                 wkr.updated = now
113                 wkr.busy = now
114                 delete(wkr.starting, ctr.UUID)
115                 wkr.running[ctr.UUID] = struct{}{}
116                 wkr.lastUUID = ctr.UUID
117                 if err != nil {
118                         logger.WithField("stdout", string(stdout)).
119                                 WithField("stderr", string(stderr)).
120                                 WithError(err).
121                                 Error("error starting crunch-run process")
122                         // Leave uuid in wkr.running, though: it's
123                         // possible the error was just a communication
124                         // failure and the process was in fact
125                         // started.  Wait for next probe to find out.
126                         return
127                 }
128                 logger.Info("crunch-run process started")
129                 wkr.lastUUID = ctr.UUID
130         }()
131 }
132
133 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
134 // for the worker's curent state. If a previous probe is still
135 // running, it does nothing.
136 //
137 // It should be called in a new goroutine.
138 func (wkr *worker) ProbeAndUpdate() {
139         select {
140         case wkr.probing <- struct{}{}:
141                 wkr.probeAndUpdate()
142                 <-wkr.probing
143         default:
144                 wkr.logger.Debug("still waiting for last probe to finish")
145         }
146 }
147
148 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
149 // updates state accordingly.
150 //
151 // In StateUnknown: Call both probeBooted and probeRunning.
152 // In StateBooting: Call probeBooted; if successful, call probeRunning.
153 // In StateRunning: Call probeRunning.
154 // In StateIdle: Call probeRunning.
155 // In StateShutdown: Do nothing.
156 //
157 // If both probes succeed, wkr.state changes to
158 // StateIdle/StateRunning.
159 //
160 // If probeRunning succeeds, wkr.running is updated. (This means
161 // wkr.running might be non-empty even in StateUnknown, if the boot
162 // probe failed.)
163 //
164 // probeAndUpdate should be called in a new goroutine.
165 func (wkr *worker) probeAndUpdate() {
166         wkr.mtx.Lock()
167         updated := wkr.updated
168         initialState := wkr.state
169         wkr.mtx.Unlock()
170
171         var (
172                 booted   bool
173                 ctrUUIDs []string
174                 ok       bool
175                 stderr   []byte // from probeBooted
176         )
177
178         switch initialState {
179         case StateShutdown:
180                 return
181         case StateIdle, StateRunning:
182                 booted = true
183         case StateUnknown, StateBooting:
184         default:
185                 panic(fmt.Sprintf("unknown state %s", initialState))
186         }
187
188         probeStart := time.Now()
189         logger := wkr.logger.WithField("ProbeStart", probeStart)
190
191         if !booted {
192                 booted, stderr = wkr.probeBooted()
193                 if !booted {
194                         // Pretend this probe succeeded if another
195                         // concurrent attempt succeeded.
196                         wkr.mtx.Lock()
197                         booted = wkr.state == StateRunning || wkr.state == StateIdle
198                         wkr.mtx.Unlock()
199                 }
200                 if booted {
201                         logger.Info("instance booted; will try probeRunning")
202                 }
203         }
204         if booted || wkr.state == StateUnknown {
205                 ctrUUIDs, ok = wkr.probeRunning()
206         }
207         wkr.mtx.Lock()
208         defer wkr.mtx.Unlock()
209         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
210                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
211                         // Skip the logging noise if shutdown was
212                         // initiated during probe.
213                         return
214                 }
215                 // Using the start time of the probe as the timeout
216                 // threshold ensures we always initiate at least one
217                 // probe attempt after the boot/probe timeout expires
218                 // (otherwise, a slow probe failure could cause us to
219                 // shutdown an instance even though it did in fact
220                 // boot/recover before the timeout expired).
221                 dur := probeStart.Sub(wkr.probed)
222                 if wkr.shutdownIfBroken(dur) {
223                         // stderr from failed run-probes will have
224                         // been logged already, but boot-probe
225                         // failures are normal so they are logged only
226                         // at Debug level. This is our chance to log
227                         // some evidence about why the node never
228                         // booted, even in non-debug mode.
229                         if !booted {
230                                 logger.WithFields(logrus.Fields{
231                                         "Duration": dur,
232                                         "stderr":   string(stderr),
233                                 }).Info("boot failed")
234                         }
235                 }
236                 return
237         }
238
239         updateTime := time.Now()
240         wkr.probed = updateTime
241
242         if updated != wkr.updated {
243                 // Worker was updated after the probe began, so
244                 // wkr.running might have a container UUID that was
245                 // not yet running when ctrUUIDs was generated. Leave
246                 // wkr.running alone and wait for the next probe to
247                 // catch up on any changes.
248                 return
249         }
250
251         if len(ctrUUIDs) > 0 {
252                 wkr.busy = updateTime
253                 wkr.lastUUID = ctrUUIDs[0]
254         } else if len(wkr.running) > 0 {
255                 // Actual last-busy time was sometime between wkr.busy
256                 // and now. Now is the earliest opportunity to take
257                 // advantage of the non-busy state, though.
258                 wkr.busy = updateTime
259         }
260         changed := false
261
262         // Build a new "running" map. Set changed=true if it differs
263         // from the existing map (wkr.running) to ensure the scheduler
264         // gets notified below.
265         running := map[string]struct{}{}
266         for _, uuid := range ctrUUIDs {
267                 running[uuid] = struct{}{}
268                 if _, ok := wkr.running[uuid]; !ok {
269                         if _, ok := wkr.starting[uuid]; !ok {
270                                 // We didn't start it -- it must have
271                                 // been started by a previous
272                                 // dispatcher process.
273                                 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
274                         }
275                         changed = true
276                 }
277         }
278         for uuid := range wkr.running {
279                 if _, ok := running[uuid]; !ok {
280                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
281                         wkr.wp.notifyExited(uuid, updateTime)
282                         changed = true
283                 }
284         }
285
286         // Update state if this was the first successful boot-probe.
287         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
288                 // Note: this will change again below if
289                 // len(wkr.starting)+len(wkr.running) > 0.
290                 wkr.state = StateIdle
291                 changed = true
292         }
293
294         // If wkr.state and wkr.running aren't changing then there's
295         // no need to log anything, notify the scheduler, move state
296         // back and forth between idle/running, etc.
297         if !changed {
298                 return
299         }
300
301         // Log whenever a run-probe reveals crunch-run processes
302         // appearing/disappearing before boot-probe succeeds.
303         if wkr.state == StateUnknown && len(running) != len(wkr.running) {
304                 logger.WithFields(logrus.Fields{
305                         "RunningContainers": len(running),
306                         "State":             wkr.state,
307                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
308         }
309
310         wkr.running = running
311         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
312                 wkr.state = StateRunning
313         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
314                 wkr.state = StateIdle
315         }
316         wkr.updated = updateTime
317         if booted && (initialState == StateUnknown || initialState == StateBooting) {
318                 logger.WithFields(logrus.Fields{
319                         "RunningContainers": len(running),
320                         "State":             wkr.state,
321                 }).Info("probes succeeded, instance is in service")
322         }
323         go wkr.wp.notify()
324 }
325
326 func (wkr *worker) probeRunning() (running []string, ok bool) {
327         cmd := "crunch-run --list"
328         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
329         if err != nil {
330                 wkr.logger.WithFields(logrus.Fields{
331                         "Command": cmd,
332                         "stdout":  string(stdout),
333                         "stderr":  string(stderr),
334                 }).WithError(err).Warn("probe failed")
335                 return nil, false
336         }
337         stdout = bytes.TrimRight(stdout, "\n")
338         if len(stdout) == 0 {
339                 return nil, true
340         }
341         return strings.Split(string(stdout), "\n"), true
342 }
343
344 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
345         cmd := wkr.wp.bootProbeCommand
346         if cmd == "" {
347                 cmd = "true"
348         }
349         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
350         logger := wkr.logger.WithFields(logrus.Fields{
351                 "Command": cmd,
352                 "stdout":  string(stdout),
353                 "stderr":  string(stderr),
354         })
355         if err != nil {
356                 logger.WithError(err).Debug("boot probe failed")
357                 return false, stderr
358         }
359         logger.Info("boot probe succeeded")
360         return true, stderr
361 }
362
363 // caller must have lock.
364 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
365         if wkr.idleBehavior == IdleBehaviorHold {
366                 // Never shut down.
367                 return false
368         }
369         label, threshold := "", wkr.wp.timeoutProbe
370         if wkr.state == StateUnknown || wkr.state == StateBooting {
371                 label, threshold = "new ", wkr.wp.timeoutBooting
372         }
373         if dur < threshold {
374                 return false
375         }
376         wkr.logger.WithFields(logrus.Fields{
377                 "Duration": dur,
378                 "Since":    wkr.probed,
379                 "State":    wkr.state,
380         }).Warnf("%sinstance unresponsive, shutting down", label)
381         wkr.shutdown()
382         return true
383 }
384
385 // caller must have lock.
386 func (wkr *worker) shutdownIfIdle() bool {
387         if wkr.idleBehavior == IdleBehaviorHold {
388                 // Never shut down.
389                 return false
390         }
391         age := time.Since(wkr.busy)
392
393         old := age >= wkr.wp.timeoutIdle
394         draining := wkr.idleBehavior == IdleBehaviorDrain
395         shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
396                 (draining && wkr.state == StateBooting)
397         if !shouldShutdown {
398                 return false
399         }
400
401         wkr.logger.WithFields(logrus.Fields{
402                 "State":        wkr.state,
403                 "Age":          age,
404                 "IdleBehavior": wkr.idleBehavior,
405         }).Info("shutdown idle worker")
406         wkr.shutdown()
407         return true
408 }
409
410 // caller must have lock.
411 func (wkr *worker) shutdown() {
412         now := time.Now()
413         wkr.updated = now
414         wkr.destroyed = now
415         wkr.state = StateShutdown
416         go wkr.wp.notify()
417         go func() {
418                 err := wkr.instance.Destroy()
419                 if err != nil {
420                         wkr.logger.WithError(err).Warn("shutdown failed")
421                         return
422                 }
423         }()
424 }
425
426 // Save worker tags to cloud provider metadata, if they don't already
427 // match. Caller must have lock.
428 func (wkr *worker) saveTags() {
429         instance := wkr.instance
430         have := instance.Tags()
431         want := cloud.InstanceTags{
432                 tagKeyInstanceType: wkr.instType.Name,
433                 tagKeyIdleBehavior: string(wkr.idleBehavior),
434         }
435         go func() {
436                 for k, v := range want {
437                         if v == have[k] {
438                                 continue
439                         }
440                         err := instance.SetTags(want)
441                         if err != nil {
442                                 wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
443                         }
444                         break
445
446                 }
447         }()
448 }