14807: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/stats"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         // TODO: configurable
23         maxPingFailTime = 10 * time.Minute
24 )
25
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
28 type State int
29
30 const (
31         StateUnknown  State = iota // might be running a container already
32         StateBooting               // instance is booting
33         StateIdle                  // instance booted, no containers are running
34         StateRunning               // instance is running one or more containers
35         StateShutdown              // worker has stopped monitoring the instance
36 )
37
38 var stateString = map[State]string{
39         StateUnknown:  "unknown",
40         StateBooting:  "booting",
41         StateIdle:     "idle",
42         StateRunning:  "running",
43         StateShutdown: "shutdown",
44 }
45
46 // String implements fmt.Stringer.
47 func (s State) String() string {
48         return stateString[s]
49 }
50
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54         return []byte(stateString[s]), nil
55 }
56
57 // IdleBehavior indicates the behavior desired when a node becomes idle.
58 type IdleBehavior string
59
60 const (
61         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
62         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
63         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
64 )
65
66 var validIdleBehavior = map[IdleBehavior]bool{
67         IdleBehaviorRun:   true,
68         IdleBehaviorHold:  true,
69         IdleBehaviorDrain: true,
70 }
71
72 type worker struct {
73         logger   logrus.FieldLogger
74         executor Executor
75         wp       *Pool
76
77         mtx          sync.Locker // must be wp's Locker.
78         state        State
79         idleBehavior IdleBehavior
80         instance     cloud.Instance
81         instType     arvados.InstanceType
82         vcpus        int64
83         memory       int64
84         appeared     time.Time
85         probed       time.Time
86         updated      time.Time
87         busy         time.Time
88         destroyed    time.Time
89         lastUUID     string
90         running      map[string]struct{} // remember to update state idle<->running when this changes
91         starting     map[string]struct{} // remember to update state idle<->running when this changes
92         probing      chan struct{}
93 }
94
95 // caller must have lock.
96 func (wkr *worker) startContainer(ctr arvados.Container) {
97         logger := wkr.logger.WithFields(logrus.Fields{
98                 "ContainerUUID": ctr.UUID,
99                 "Priority":      ctr.Priority,
100         })
101         logger = logger.WithField("Instance", wkr.instance.ID())
102         logger.Debug("starting container")
103         wkr.starting[ctr.UUID] = struct{}{}
104         wkr.state = StateRunning
105         go func() {
106                 env := map[string]string{
107                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
108                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
109                 }
110                 if wkr.wp.arvClient.Insecure {
111                         env["ARVADOS_API_HOST_INSECURE"] = "1"
112                 }
113                 envJSON, err := json.Marshal(env)
114                 if err != nil {
115                         panic(err)
116                 }
117                 stdin := bytes.NewBuffer(envJSON)
118                 cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
119                 if u := wkr.instance.RemoteUser(); u != "root" {
120                         cmd = "sudo " + cmd
121                 }
122                 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
123                 wkr.mtx.Lock()
124                 defer wkr.mtx.Unlock()
125                 now := time.Now()
126                 wkr.updated = now
127                 wkr.busy = now
128                 delete(wkr.starting, ctr.UUID)
129                 wkr.running[ctr.UUID] = struct{}{}
130                 wkr.lastUUID = ctr.UUID
131                 if err != nil {
132                         logger.WithField("stdout", string(stdout)).
133                                 WithField("stderr", string(stderr)).
134                                 WithError(err).
135                                 Error("error starting crunch-run process")
136                         // Leave uuid in wkr.running, though: it's
137                         // possible the error was just a communication
138                         // failure and the process was in fact
139                         // started.  Wait for next probe to find out.
140                         return
141                 }
142                 logger.Info("crunch-run process started")
143                 wkr.lastUUID = ctr.UUID
144         }()
145 }
146
147 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
148 // for the worker's curent state. If a previous probe is still
149 // running, it does nothing.
150 //
151 // It should be called in a new goroutine.
152 func (wkr *worker) ProbeAndUpdate() {
153         select {
154         case wkr.probing <- struct{}{}:
155                 wkr.probeAndUpdate()
156                 <-wkr.probing
157         default:
158                 wkr.logger.Debug("still waiting for last probe to finish")
159         }
160 }
161
162 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
163 // updates state accordingly.
164 //
165 // In StateUnknown: Call both probeBooted and probeRunning.
166 // In StateBooting: Call probeBooted; if successful, call probeRunning.
167 // In StateRunning: Call probeRunning.
168 // In StateIdle: Call probeRunning.
169 // In StateShutdown: Do nothing.
170 //
171 // If both probes succeed, wkr.state changes to
172 // StateIdle/StateRunning.
173 //
174 // If probeRunning succeeds, wkr.running is updated. (This means
175 // wkr.running might be non-empty even in StateUnknown, if the boot
176 // probe failed.)
177 //
178 // probeAndUpdate should be called in a new goroutine.
179 func (wkr *worker) probeAndUpdate() {
180         wkr.mtx.Lock()
181         updated := wkr.updated
182         initialState := wkr.state
183         wkr.mtx.Unlock()
184
185         var (
186                 booted   bool
187                 ctrUUIDs []string
188                 ok       bool
189                 stderr   []byte // from probeBooted
190         )
191
192         switch initialState {
193         case StateShutdown:
194                 return
195         case StateIdle, StateRunning:
196                 booted = true
197         case StateUnknown, StateBooting:
198         default:
199                 panic(fmt.Sprintf("unknown state %s", initialState))
200         }
201
202         probeStart := time.Now()
203         logger := wkr.logger.WithField("ProbeStart", probeStart)
204
205         if !booted {
206                 booted, stderr = wkr.probeBooted()
207                 if !booted {
208                         // Pretend this probe succeeded if another
209                         // concurrent attempt succeeded.
210                         wkr.mtx.Lock()
211                         booted = wkr.state == StateRunning || wkr.state == StateIdle
212                         wkr.mtx.Unlock()
213                 }
214                 if booted {
215                         logger.Info("instance booted; will try probeRunning")
216                 }
217         }
218         if booted || wkr.state == StateUnknown {
219                 ctrUUIDs, ok = wkr.probeRunning()
220         }
221         wkr.mtx.Lock()
222         defer wkr.mtx.Unlock()
223         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
224                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
225                         // Skip the logging noise if shutdown was
226                         // initiated during probe.
227                         return
228                 }
229                 // Using the start time of the probe as the timeout
230                 // threshold ensures we always initiate at least one
231                 // probe attempt after the boot/probe timeout expires
232                 // (otherwise, a slow probe failure could cause us to
233                 // shutdown an instance even though it did in fact
234                 // boot/recover before the timeout expired).
235                 dur := probeStart.Sub(wkr.probed)
236                 if wkr.shutdownIfBroken(dur) {
237                         // stderr from failed run-probes will have
238                         // been logged already, but boot-probe
239                         // failures are normal so they are logged only
240                         // at Debug level. This is our chance to log
241                         // some evidence about why the node never
242                         // booted, even in non-debug mode.
243                         if !booted {
244                                 logger.WithFields(logrus.Fields{
245                                         "Duration": dur,
246                                         "stderr":   string(stderr),
247                                 }).Info("boot failed")
248                         }
249                 }
250                 return
251         }
252
253         updateTime := time.Now()
254         wkr.probed = updateTime
255
256         if updated != wkr.updated {
257                 // Worker was updated after the probe began, so
258                 // wkr.running might have a container UUID that was
259                 // not yet running when ctrUUIDs was generated. Leave
260                 // wkr.running alone and wait for the next probe to
261                 // catch up on any changes.
262                 return
263         }
264
265         if len(ctrUUIDs) > 0 {
266                 wkr.busy = updateTime
267                 wkr.lastUUID = ctrUUIDs[0]
268         } else if len(wkr.running) > 0 {
269                 // Actual last-busy time was sometime between wkr.busy
270                 // and now. Now is the earliest opportunity to take
271                 // advantage of the non-busy state, though.
272                 wkr.busy = updateTime
273         }
274         changed := false
275
276         // Build a new "running" map. Set changed=true if it differs
277         // from the existing map (wkr.running) to ensure the scheduler
278         // gets notified below.
279         running := map[string]struct{}{}
280         for _, uuid := range ctrUUIDs {
281                 running[uuid] = struct{}{}
282                 if _, ok := wkr.running[uuid]; !ok {
283                         if _, ok := wkr.starting[uuid]; !ok {
284                                 // We didn't start it -- it must have
285                                 // been started by a previous
286                                 // dispatcher process.
287                                 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
288                         }
289                         changed = true
290                 }
291         }
292         for uuid := range wkr.running {
293                 if _, ok := running[uuid]; !ok {
294                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
295                         wkr.wp.notifyExited(uuid, updateTime)
296                         changed = true
297                 }
298         }
299
300         // Update state if this was the first successful boot-probe.
301         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
302                 // Note: this will change again below if
303                 // len(wkr.starting)+len(wkr.running) > 0.
304                 wkr.state = StateIdle
305                 changed = true
306         }
307
308         // If wkr.state and wkr.running aren't changing then there's
309         // no need to log anything, notify the scheduler, move state
310         // back and forth between idle/running, etc.
311         if !changed {
312                 return
313         }
314
315         // Log whenever a run-probe reveals crunch-run processes
316         // appearing/disappearing before boot-probe succeeds.
317         if wkr.state == StateUnknown && len(running) != len(wkr.running) {
318                 logger.WithFields(logrus.Fields{
319                         "RunningContainers": len(running),
320                         "State":             wkr.state,
321                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
322         }
323
324         wkr.running = running
325         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
326                 wkr.state = StateRunning
327         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
328                 wkr.state = StateIdle
329         }
330         wkr.updated = updateTime
331         if booted && (initialState == StateUnknown || initialState == StateBooting) {
332                 logger.WithFields(logrus.Fields{
333                         "RunningContainers": len(running),
334                         "State":             wkr.state,
335                 }).Info("probes succeeded, instance is in service")
336         }
337         go wkr.wp.notify()
338 }
339
340 func (wkr *worker) probeRunning() (running []string, ok bool) {
341         cmd := "crunch-run --list"
342         if u := wkr.instance.RemoteUser(); u != "root" {
343                 cmd = "sudo " + cmd
344         }
345         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
346         if err != nil {
347                 wkr.logger.WithFields(logrus.Fields{
348                         "Command": cmd,
349                         "stdout":  string(stdout),
350                         "stderr":  string(stderr),
351                 }).WithError(err).Warn("probe failed")
352                 return nil, false
353         }
354         stdout = bytes.TrimRight(stdout, "\n")
355         if len(stdout) == 0 {
356                 return nil, true
357         }
358         return strings.Split(string(stdout), "\n"), true
359 }
360
361 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
362         cmd := wkr.wp.bootProbeCommand
363         if cmd == "" {
364                 cmd = "true"
365         }
366         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
367         logger := wkr.logger.WithFields(logrus.Fields{
368                 "Command": cmd,
369                 "stdout":  string(stdout),
370                 "stderr":  string(stderr),
371         })
372         if err != nil {
373                 logger.WithError(err).Debug("boot probe failed")
374                 return false, stderr
375         }
376         logger.Info("boot probe succeeded")
377         return true, stderr
378 }
379
380 // caller must have lock.
381 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
382         if wkr.idleBehavior == IdleBehaviorHold {
383                 // Never shut down.
384                 return false
385         }
386         label, threshold := "", wkr.wp.timeoutProbe
387         if wkr.state == StateUnknown || wkr.state == StateBooting {
388                 label, threshold = "new ", wkr.wp.timeoutBooting
389         }
390         if dur < threshold {
391                 return false
392         }
393         wkr.logger.WithFields(logrus.Fields{
394                 "Duration": dur,
395                 "Since":    wkr.probed,
396                 "State":    wkr.state,
397         }).Warnf("%sinstance unresponsive, shutting down", label)
398         wkr.shutdown()
399         return true
400 }
401
402 // caller must have lock.
403 func (wkr *worker) shutdownIfIdle() bool {
404         if wkr.idleBehavior == IdleBehaviorHold {
405                 // Never shut down.
406                 return false
407         }
408         age := time.Since(wkr.busy)
409
410         old := age >= wkr.wp.timeoutIdle
411         draining := wkr.idleBehavior == IdleBehaviorDrain
412         shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
413                 (draining && wkr.state == StateBooting)
414         if !shouldShutdown {
415                 return false
416         }
417
418         wkr.logger.WithFields(logrus.Fields{
419                 "State":        wkr.state,
420                 "IdleDuration": stats.Duration(age),
421                 "IdleBehavior": wkr.idleBehavior,
422         }).Info("shutdown idle worker")
423         wkr.shutdown()
424         return true
425 }
426
427 // caller must have lock.
428 func (wkr *worker) shutdown() {
429         now := time.Now()
430         wkr.updated = now
431         wkr.destroyed = now
432         wkr.state = StateShutdown
433         go wkr.wp.notify()
434         go func() {
435                 err := wkr.instance.Destroy()
436                 if err != nil {
437                         wkr.logger.WithError(err).Warn("shutdown failed")
438                         return
439                 }
440         }()
441 }
442
443 // Save worker tags to cloud provider metadata, if they don't already
444 // match. Caller must have lock.
445 func (wkr *worker) saveTags() {
446         instance := wkr.instance
447         tags := instance.Tags()
448         update := cloud.InstanceTags{
449                 tagKeyInstanceType: wkr.instType.Name,
450                 tagKeyIdleBehavior: string(wkr.idleBehavior),
451         }
452         save := false
453         for k, v := range update {
454                 if tags[k] != v {
455                         tags[k] = v
456                         save = true
457                 }
458         }
459         if save {
460                 go func() {
461                         err := instance.SetTags(tags)
462                         if err != nil {
463                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
464                         }
465                 }()
466         }
467 }