14920: Update metrics when worker state changes idle->running.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // IdleBehavior indicates the behavior desired when a node becomes idle.
58 type IdleBehavior string
59
60 const (
61         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
62         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
63         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
64 )
65
66 var validIdleBehavior = map[IdleBehavior]bool{
67         IdleBehaviorRun:   true,
68         IdleBehaviorHold:  true,
69         IdleBehaviorDrain: true,
70 }
71
72 type worker struct {
73         logger   logrus.FieldLogger
74         executor Executor
75         wp       *Pool
76
77         mtx          sync.Locker // must be wp's Locker.
78         state        State
79         idleBehavior IdleBehavior
80         instance     cloud.Instance
81         instType     arvados.InstanceType
82         vcpus        int64
83         memory       int64
84         appeared     time.Time
85         probed       time.Time
86         updated      time.Time
87         busy         time.Time
88         destroyed    time.Time
89         lastUUID     string
90         running      map[string]struct{} // remember to update state idle<->running when this changes
91         starting     map[string]struct{} // remember to update state idle<->running when this changes
92         probing      chan struct{}
93 }
94
95 // caller must have lock.
96 func (wkr *worker) startContainer(ctr arvados.Container) {
97         logger := wkr.logger.WithFields(logrus.Fields{
98                 "ContainerUUID": ctr.UUID,
99                 "Priority":      ctr.Priority,
100         })
101         logger = logger.WithField("Instance", wkr.instance.ID())
102         logger.Debug("starting container")
103         wkr.starting[ctr.UUID] = struct{}{}
104         if wkr.state != StateRunning {
105                 wkr.state = StateRunning
106                 go wkr.wp.notify()
107         }
108         go func() {
109                 env := map[string]string{
110                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
111                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
112                 }
113                 if wkr.wp.arvClient.Insecure {
114                         env["ARVADOS_API_HOST_INSECURE"] = "1"
115                 }
116                 envJSON, err := json.Marshal(env)
117                 if err != nil {
118                         panic(err)
119                 }
120                 stdin := bytes.NewBuffer(envJSON)
121                 cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
122                 if u := wkr.instance.RemoteUser(); u != "root" {
123                         cmd = "sudo " + cmd
124                 }
125                 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
126                 wkr.mtx.Lock()
127                 defer wkr.mtx.Unlock()
128                 now := time.Now()
129                 wkr.updated = now
130                 wkr.busy = now
131                 delete(wkr.starting, ctr.UUID)
132                 wkr.running[ctr.UUID] = struct{}{}
133                 wkr.lastUUID = ctr.UUID
134                 if err != nil {
135                         logger.WithField("stdout", string(stdout)).
136                                 WithField("stderr", string(stderr)).
137                                 WithError(err).
138                                 Error("error starting crunch-run process")
139                         // Leave uuid in wkr.running, though: it's
140                         // possible the error was just a communication
141                         // failure and the process was in fact
142                         // started.  Wait for next probe to find out.
143                         return
144                 }
145                 logger.Info("crunch-run process started")
146                 wkr.lastUUID = ctr.UUID
147         }()
148 }
149
150 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
151 // for the worker's curent state. If a previous probe is still
152 // running, it does nothing.
153 //
154 // It should be called in a new goroutine.
155 func (wkr *worker) ProbeAndUpdate() {
156         select {
157         case wkr.probing <- struct{}{}:
158                 wkr.probeAndUpdate()
159                 <-wkr.probing
160         default:
161                 wkr.logger.Debug("still waiting for last probe to finish")
162         }
163 }
164
165 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
166 // updates state accordingly.
167 //
168 // In StateUnknown: Call both probeBooted and probeRunning.
169 // In StateBooting: Call probeBooted; if successful, call probeRunning.
170 // In StateRunning: Call probeRunning.
171 // In StateIdle: Call probeRunning.
172 // In StateShutdown: Do nothing.
173 //
174 // If both probes succeed, wkr.state changes to
175 // StateIdle/StateRunning.
176 //
177 // If probeRunning succeeds, wkr.running is updated. (This means
178 // wkr.running might be non-empty even in StateUnknown, if the boot
179 // probe failed.)
180 //
181 // probeAndUpdate should be called in a new goroutine.
182 func (wkr *worker) probeAndUpdate() {
183         wkr.mtx.Lock()
184         updated := wkr.updated
185         initialState := wkr.state
186         wkr.mtx.Unlock()
187
188         var (
189                 booted   bool
190                 ctrUUIDs []string
191                 ok       bool
192                 stderr   []byte // from probeBooted
193         )
194
195         switch initialState {
196         case StateShutdown:
197                 return
198         case StateIdle, StateRunning:
199                 booted = true
200         case StateUnknown, StateBooting:
201         default:
202                 panic(fmt.Sprintf("unknown state %s", initialState))
203         }
204
205         probeStart := time.Now()
206         logger := wkr.logger.WithField("ProbeStart", probeStart)
207
208         if !booted {
209                 booted, stderr = wkr.probeBooted()
210                 if !booted {
211                         // Pretend this probe succeeded if another
212                         // concurrent attempt succeeded.
213                         wkr.mtx.Lock()
214                         booted = wkr.state == StateRunning || wkr.state == StateIdle
215                         wkr.mtx.Unlock()
216                 }
217                 if booted {
218                         logger.Info("instance booted; will try probeRunning")
219                 }
220         }
221         if booted || wkr.state == StateUnknown {
222                 ctrUUIDs, ok = wkr.probeRunning()
223         }
224         wkr.mtx.Lock()
225         defer wkr.mtx.Unlock()
226         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
227                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
228                         // Skip the logging noise if shutdown was
229                         // initiated during probe.
230                         return
231                 }
232                 // Using the start time of the probe as the timeout
233                 // threshold ensures we always initiate at least one
234                 // probe attempt after the boot/probe timeout expires
235                 // (otherwise, a slow probe failure could cause us to
236                 // shutdown an instance even though it did in fact
237                 // boot/recover before the timeout expired).
238                 dur := probeStart.Sub(wkr.probed)
239                 if wkr.shutdownIfBroken(dur) {
240                         // stderr from failed run-probes will have
241                         // been logged already, but boot-probe
242                         // failures are normal so they are logged only
243                         // at Debug level. This is our chance to log
244                         // some evidence about why the node never
245                         // booted, even in non-debug mode.
246                         if !booted {
247                                 logger.WithFields(logrus.Fields{
248                                         "Duration": dur,
249                                         "stderr":   string(stderr),
250                                 }).Info("boot failed")
251                         }
252                 }
253                 return
254         }
255
256         updateTime := time.Now()
257         wkr.probed = updateTime
258
259         if updated != wkr.updated {
260                 // Worker was updated after the probe began, so
261                 // wkr.running might have a container UUID that was
262                 // not yet running when ctrUUIDs was generated. Leave
263                 // wkr.running alone and wait for the next probe to
264                 // catch up on any changes.
265                 return
266         }
267
268         if len(ctrUUIDs) > 0 {
269                 wkr.busy = updateTime
270                 wkr.lastUUID = ctrUUIDs[0]
271         } else if len(wkr.running) > 0 {
272                 // Actual last-busy time was sometime between wkr.busy
273                 // and now. Now is the earliest opportunity to take
274                 // advantage of the non-busy state, though.
275                 wkr.busy = updateTime
276         }
277         changed := false
278
279         // Build a new "running" map. Set changed=true if it differs
280         // from the existing map (wkr.running) to ensure the scheduler
281         // gets notified below.
282         running := map[string]struct{}{}
283         for _, uuid := range ctrUUIDs {
284                 running[uuid] = struct{}{}
285                 if _, ok := wkr.running[uuid]; !ok {
286                         if _, ok := wkr.starting[uuid]; !ok {
287                                 // We didn't start it -- it must have
288                                 // been started by a previous
289                                 // dispatcher process.
290                                 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
291                         }
292                         changed = true
293                 }
294         }
295         for uuid := range wkr.running {
296                 if _, ok := running[uuid]; !ok {
297                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
298                         wkr.wp.notifyExited(uuid, updateTime)
299                         changed = true
300                 }
301         }
302
303         // Update state if this was the first successful boot-probe.
304         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
305                 // Note: this will change again below if
306                 // len(wkr.starting)+len(wkr.running) > 0.
307                 wkr.state = StateIdle
308                 changed = true
309         }
310
311         // If wkr.state and wkr.running aren't changing then there's
312         // no need to log anything, notify the scheduler, move state
313         // back and forth between idle/running, etc.
314         if !changed {
315                 return
316         }
317
318         // Log whenever a run-probe reveals crunch-run processes
319         // appearing/disappearing before boot-probe succeeds.
320         if wkr.state == StateUnknown && len(running) != len(wkr.running) {
321                 logger.WithFields(logrus.Fields{
322                         "RunningContainers": len(running),
323                         "State":             wkr.state,
324                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
325         }
326
327         wkr.running = running
328         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
329                 wkr.state = StateRunning
330         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
331                 wkr.state = StateIdle
332         }
333         wkr.updated = updateTime
334         if booted && (initialState == StateUnknown || initialState == StateBooting) {
335                 logger.WithFields(logrus.Fields{
336                         "RunningContainers": len(running),
337                         "State":             wkr.state,
338                 }).Info("probes succeeded, instance is in service")
339         }
340         go wkr.wp.notify()
341 }
342
343 func (wkr *worker) probeRunning() (running []string, ok bool) {
344         cmd := "crunch-run --list"
345         if u := wkr.instance.RemoteUser(); u != "root" {
346                 cmd = "sudo " + cmd
347         }
348         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
349         if err != nil {
350                 wkr.logger.WithFields(logrus.Fields{
351                         "Command": cmd,
352                         "stdout":  string(stdout),
353                         "stderr":  string(stderr),
354                 }).WithError(err).Warn("probe failed")
355                 return nil, false
356         }
357         stdout = bytes.TrimRight(stdout, "\n")
358         if len(stdout) == 0 {
359                 return nil, true
360         }
361         return strings.Split(string(stdout), "\n"), true
362 }
363
364 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
365         cmd := wkr.wp.bootProbeCommand
366         if cmd == "" {
367                 cmd = "true"
368         }
369         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
370         logger := wkr.logger.WithFields(logrus.Fields{
371                 "Command": cmd,
372                 "stdout":  string(stdout),
373                 "stderr":  string(stderr),
374         })
375         if err != nil {
376                 logger.WithError(err).Debug("boot probe failed")
377                 return false, stderr
378         }
379         logger.Info("boot probe succeeded")
380         return true, stderr
381 }
382
383 // caller must have lock.
384 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
385         if wkr.idleBehavior == IdleBehaviorHold {
386                 // Never shut down.
387                 return false
388         }
389         label, threshold := "", wkr.wp.timeoutProbe
390         if wkr.state == StateUnknown || wkr.state == StateBooting {
391                 label, threshold = "new ", wkr.wp.timeoutBooting
392         }
393         if dur < threshold {
394                 return false
395         }
396         wkr.logger.WithFields(logrus.Fields{
397                 "Duration": dur,
398                 "Since":    wkr.probed,
399                 "State":    wkr.state,
400         }).Warnf("%sinstance unresponsive, shutting down", label)
401         wkr.shutdown()
402         return true
403 }
404
405 // caller must have lock.
406 func (wkr *worker) shutdownIfIdle() bool {
407         if wkr.idleBehavior == IdleBehaviorHold {
408                 // Never shut down.
409                 return false
410         }
411         age := time.Since(wkr.busy)
412
413         old := age >= wkr.wp.timeoutIdle
414         draining := wkr.idleBehavior == IdleBehaviorDrain
415         shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
416                 (draining && wkr.state == StateBooting)
417         if !shouldShutdown {
418                 return false
419         }
420
421         wkr.logger.WithFields(logrus.Fields{
422                 "State":        wkr.state,
423                 "IdleDuration": stats.Duration(age),
424                 "IdleBehavior": wkr.idleBehavior,
425         }).Info("shutdown idle worker")
426         wkr.shutdown()
427         return true
428 }
429
430 // caller must have lock.
431 func (wkr *worker) shutdown() {
432         now := time.Now()
433         wkr.updated = now
434         wkr.destroyed = now
435         wkr.state = StateShutdown
436         go wkr.wp.notify()
437         go func() {
438                 err := wkr.instance.Destroy()
439                 if err != nil {
440                         wkr.logger.WithError(err).Warn("shutdown failed")
441                         return
442                 }
443         }()
444 }
445
446 // Save worker tags to cloud provider metadata, if they don't already
447 // match. Caller must have lock.
448 func (wkr *worker) saveTags() {
449         instance := wkr.instance
450         tags := instance.Tags()
451         update := cloud.InstanceTags{
452                 tagKeyInstanceType: wkr.instType.Name,
453                 tagKeyIdleBehavior: string(wkr.idleBehavior),
454         }
455         save := false
456         for k, v := range update {
457                 if tags[k] != v {
458                         tags[k] = v
459                         save = true
460                 }
461         }
462         if save {
463                 go func() {
464                         err := instance.SetTags(tags)
465                         if err != nil {
466                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
467                         }
468                 }()
469         }
470 }