14807: Log full instance ID.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/sirupsen/logrus"
17 )
18
19 const (
20         // TODO: configurable
21         maxPingFailTime = 10 * time.Minute
22 )
23
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
26 type State int
27
28 const (
29         StateUnknown  State = iota // might be running a container already
30         StateBooting               // instance is booting
31         StateIdle                  // instance booted, no containers are running
32         StateRunning               // instance is running one or more containers
33         StateShutdown              // worker has stopped monitoring the instance
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42 }
43
44 // String implements fmt.Stringer.
45 func (s State) String() string {
46         return stateString[s]
47 }
48
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52         return []byte(stateString[s]), nil
53 }
54
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
57
58 const (
59         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
60         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
61         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
62 )
63
64 var validIdleBehavior = map[IdleBehavior]bool{
65         IdleBehaviorRun:   true,
66         IdleBehaviorHold:  true,
67         IdleBehaviorDrain: true,
68 }
69
70 type worker struct {
71         logger   logrus.FieldLogger
72         executor Executor
73         wp       *Pool
74
75         mtx          sync.Locker // must be wp's Locker.
76         state        State
77         idleBehavior IdleBehavior
78         instance     cloud.Instance
79         instType     arvados.InstanceType
80         vcpus        int64
81         memory       int64
82         appeared     time.Time
83         probed       time.Time
84         updated      time.Time
85         busy         time.Time
86         destroyed    time.Time
87         lastUUID     string
88         running      map[string]struct{} // remember to update state idle<->running when this changes
89         starting     map[string]struct{} // remember to update state idle<->running when this changes
90         probing      chan struct{}
91 }
92
93 // caller must have lock.
94 func (wkr *worker) startContainer(ctr arvados.Container) {
95         logger := wkr.logger.WithFields(logrus.Fields{
96                 "ContainerUUID": ctr.UUID,
97                 "Priority":      ctr.Priority,
98         })
99         logger = logger.WithField("Instance", wkr.instance.ID())
100         logger.Debug("starting container")
101         wkr.starting[ctr.UUID] = struct{}{}
102         wkr.state = StateRunning
103         go func() {
104                 cmd := "crunch-run --detach '" + ctr.UUID + "'"
105                 stdin := bytes.NewBufferString(fmt.Sprintf("export %s=%q\nexport %s=%q\n",
106                         "ARVADOS_API_HOST", wkr.wp.arvClient.APIHost,
107                         "ARVADOS_API_TOKEN", wkr.wp.arvClient.AuthToken))
108                 if u := wkr.instance.RemoteUser(); u != "root" {
109                         cmd = "sudo -E " + cmd
110                 }
111                 cmd = "source /dev/stdin; " + cmd
112                 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
113                 wkr.mtx.Lock()
114                 defer wkr.mtx.Unlock()
115                 now := time.Now()
116                 wkr.updated = now
117                 wkr.busy = now
118                 delete(wkr.starting, ctr.UUID)
119                 wkr.running[ctr.UUID] = struct{}{}
120                 wkr.lastUUID = ctr.UUID
121                 if err != nil {
122                         logger.WithField("stdout", string(stdout)).
123                                 WithField("stderr", string(stderr)).
124                                 WithError(err).
125                                 Error("error starting crunch-run process")
126                         // Leave uuid in wkr.running, though: it's
127                         // possible the error was just a communication
128                         // failure and the process was in fact
129                         // started.  Wait for next probe to find out.
130                         return
131                 }
132                 logger.Info("crunch-run process started")
133                 wkr.lastUUID = ctr.UUID
134         }()
135 }
136
137 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
138 // for the worker's curent state. If a previous probe is still
139 // running, it does nothing.
140 //
141 // It should be called in a new goroutine.
142 func (wkr *worker) ProbeAndUpdate() {
143         select {
144         case wkr.probing <- struct{}{}:
145                 wkr.probeAndUpdate()
146                 <-wkr.probing
147         default:
148                 wkr.logger.Debug("still waiting for last probe to finish")
149         }
150 }
151
152 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
153 // updates state accordingly.
154 //
155 // In StateUnknown: Call both probeBooted and probeRunning.
156 // In StateBooting: Call probeBooted; if successful, call probeRunning.
157 // In StateRunning: Call probeRunning.
158 // In StateIdle: Call probeRunning.
159 // In StateShutdown: Do nothing.
160 //
161 // If both probes succeed, wkr.state changes to
162 // StateIdle/StateRunning.
163 //
164 // If probeRunning succeeds, wkr.running is updated. (This means
165 // wkr.running might be non-empty even in StateUnknown, if the boot
166 // probe failed.)
167 //
168 // probeAndUpdate should be called in a new goroutine.
169 func (wkr *worker) probeAndUpdate() {
170         wkr.mtx.Lock()
171         updated := wkr.updated
172         initialState := wkr.state
173         wkr.mtx.Unlock()
174
175         var (
176                 booted   bool
177                 ctrUUIDs []string
178                 ok       bool
179                 stderr   []byte // from probeBooted
180         )
181
182         switch initialState {
183         case StateShutdown:
184                 return
185         case StateIdle, StateRunning:
186                 booted = true
187         case StateUnknown, StateBooting:
188         default:
189                 panic(fmt.Sprintf("unknown state %s", initialState))
190         }
191
192         probeStart := time.Now()
193         logger := wkr.logger.WithField("ProbeStart", probeStart)
194
195         if !booted {
196                 booted, stderr = wkr.probeBooted()
197                 if !booted {
198                         // Pretend this probe succeeded if another
199                         // concurrent attempt succeeded.
200                         wkr.mtx.Lock()
201                         booted = wkr.state == StateRunning || wkr.state == StateIdle
202                         wkr.mtx.Unlock()
203                 }
204                 if booted {
205                         logger.Info("instance booted; will try probeRunning")
206                 }
207         }
208         if booted || wkr.state == StateUnknown {
209                 ctrUUIDs, ok = wkr.probeRunning()
210         }
211         wkr.mtx.Lock()
212         defer wkr.mtx.Unlock()
213         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
214                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
215                         // Skip the logging noise if shutdown was
216                         // initiated during probe.
217                         return
218                 }
219                 // Using the start time of the probe as the timeout
220                 // threshold ensures we always initiate at least one
221                 // probe attempt after the boot/probe timeout expires
222                 // (otherwise, a slow probe failure could cause us to
223                 // shutdown an instance even though it did in fact
224                 // boot/recover before the timeout expired).
225                 dur := probeStart.Sub(wkr.probed)
226                 if wkr.shutdownIfBroken(dur) {
227                         // stderr from failed run-probes will have
228                         // been logged already, but boot-probe
229                         // failures are normal so they are logged only
230                         // at Debug level. This is our chance to log
231                         // some evidence about why the node never
232                         // booted, even in non-debug mode.
233                         if !booted {
234                                 logger.WithFields(logrus.Fields{
235                                         "Duration": dur,
236                                         "stderr":   string(stderr),
237                                 }).Info("boot failed")
238                         }
239                 }
240                 return
241         }
242
243         updateTime := time.Now()
244         wkr.probed = updateTime
245
246         if updated != wkr.updated {
247                 // Worker was updated after the probe began, so
248                 // wkr.running might have a container UUID that was
249                 // not yet running when ctrUUIDs was generated. Leave
250                 // wkr.running alone and wait for the next probe to
251                 // catch up on any changes.
252                 return
253         }
254
255         if len(ctrUUIDs) > 0 {
256                 wkr.busy = updateTime
257                 wkr.lastUUID = ctrUUIDs[0]
258         } else if len(wkr.running) > 0 {
259                 // Actual last-busy time was sometime between wkr.busy
260                 // and now. Now is the earliest opportunity to take
261                 // advantage of the non-busy state, though.
262                 wkr.busy = updateTime
263         }
264         changed := false
265
266         // Build a new "running" map. Set changed=true if it differs
267         // from the existing map (wkr.running) to ensure the scheduler
268         // gets notified below.
269         running := map[string]struct{}{}
270         for _, uuid := range ctrUUIDs {
271                 running[uuid] = struct{}{}
272                 if _, ok := wkr.running[uuid]; !ok {
273                         if _, ok := wkr.starting[uuid]; !ok {
274                                 // We didn't start it -- it must have
275                                 // been started by a previous
276                                 // dispatcher process.
277                                 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
278                         }
279                         changed = true
280                 }
281         }
282         for uuid := range wkr.running {
283                 if _, ok := running[uuid]; !ok {
284                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
285                         wkr.wp.notifyExited(uuid, updateTime)
286                         changed = true
287                 }
288         }
289
290         // Update state if this was the first successful boot-probe.
291         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
292                 // Note: this will change again below if
293                 // len(wkr.starting)+len(wkr.running) > 0.
294                 wkr.state = StateIdle
295                 changed = true
296         }
297
298         // If wkr.state and wkr.running aren't changing then there's
299         // no need to log anything, notify the scheduler, move state
300         // back and forth between idle/running, etc.
301         if !changed {
302                 return
303         }
304
305         // Log whenever a run-probe reveals crunch-run processes
306         // appearing/disappearing before boot-probe succeeds.
307         if wkr.state == StateUnknown && len(running) != len(wkr.running) {
308                 logger.WithFields(logrus.Fields{
309                         "RunningContainers": len(running),
310                         "State":             wkr.state,
311                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
312         }
313
314         wkr.running = running
315         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
316                 wkr.state = StateRunning
317         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
318                 wkr.state = StateIdle
319         }
320         wkr.updated = updateTime
321         if booted && (initialState == StateUnknown || initialState == StateBooting) {
322                 logger.WithFields(logrus.Fields{
323                         "RunningContainers": len(running),
324                         "State":             wkr.state,
325                 }).Info("probes succeeded, instance is in service")
326         }
327         go wkr.wp.notify()
328 }
329
330 func (wkr *worker) probeRunning() (running []string, ok bool) {
331         cmd := "crunch-run --list"
332         if u := wkr.instance.RemoteUser(); u != "root" {
333                 cmd = "sudo " + cmd
334         }
335         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
336         if err != nil {
337                 wkr.logger.WithFields(logrus.Fields{
338                         "Command": cmd,
339                         "stdout":  string(stdout),
340                         "stderr":  string(stderr),
341                 }).WithError(err).Warn("probe failed")
342                 return nil, false
343         }
344         stdout = bytes.TrimRight(stdout, "\n")
345         if len(stdout) == 0 {
346                 return nil, true
347         }
348         return strings.Split(string(stdout), "\n"), true
349 }
350
351 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
352         cmd := wkr.wp.bootProbeCommand
353         if cmd == "" {
354                 cmd = "true"
355         }
356         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
357         logger := wkr.logger.WithFields(logrus.Fields{
358                 "Command": cmd,
359                 "stdout":  string(stdout),
360                 "stderr":  string(stderr),
361         })
362         if err != nil {
363                 logger.WithError(err).Debug("boot probe failed")
364                 return false, stderr
365         }
366         logger.Info("boot probe succeeded")
367         return true, stderr
368 }
369
370 // caller must have lock.
371 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
372         if wkr.idleBehavior == IdleBehaviorHold {
373                 // Never shut down.
374                 return false
375         }
376         label, threshold := "", wkr.wp.timeoutProbe
377         if wkr.state == StateUnknown || wkr.state == StateBooting {
378                 label, threshold = "new ", wkr.wp.timeoutBooting
379         }
380         if dur < threshold {
381                 return false
382         }
383         wkr.logger.WithFields(logrus.Fields{
384                 "Duration": dur,
385                 "Since":    wkr.probed,
386                 "State":    wkr.state,
387         }).Warnf("%sinstance unresponsive, shutting down", label)
388         wkr.shutdown()
389         return true
390 }
391
392 // caller must have lock.
393 func (wkr *worker) shutdownIfIdle() bool {
394         if wkr.idleBehavior == IdleBehaviorHold {
395                 // Never shut down.
396                 return false
397         }
398         age := time.Since(wkr.busy)
399
400         old := age >= wkr.wp.timeoutIdle
401         draining := wkr.idleBehavior == IdleBehaviorDrain
402         shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
403                 (draining && wkr.state == StateBooting)
404         if !shouldShutdown {
405                 return false
406         }
407
408         wkr.logger.WithFields(logrus.Fields{
409                 "State":        wkr.state,
410                 "Age":          age,
411                 "IdleBehavior": wkr.idleBehavior,
412         }).Info("shutdown idle worker")
413         wkr.shutdown()
414         return true
415 }
416
417 // caller must have lock.
418 func (wkr *worker) shutdown() {
419         now := time.Now()
420         wkr.updated = now
421         wkr.destroyed = now
422         wkr.state = StateShutdown
423         go wkr.wp.notify()
424         go func() {
425                 err := wkr.instance.Destroy()
426                 if err != nil {
427                         wkr.logger.WithError(err).Warn("shutdown failed")
428                         return
429                 }
430         }()
431 }
432
433 // Save worker tags to cloud provider metadata, if they don't already
434 // match. Caller must have lock.
435 func (wkr *worker) saveTags() {
436         instance := wkr.instance
437         have := instance.Tags()
438         want := cloud.InstanceTags{
439                 tagKeyInstanceType: wkr.instType.Name,
440                 tagKeyIdleBehavior: string(wkr.idleBehavior),
441         }
442         go func() {
443                 for k, v := range want {
444                         if v == have[k] {
445                                 continue
446                         }
447                         err := instance.SetTags(want)
448                         if err != nil {
449                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
450                         }
451                         break
452
453                 }
454         }()
455 }