14325: Don't shutdown busy VMs even if boot probe fails.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "fmt"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/sirupsen/logrus"
17 )
18
19 const (
20         // TODO: configurable
21         maxPingFailTime = 10 * time.Minute
22 )
23
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
26 type State int
27
28 const (
29         StateUnknown  State = iota // might be running a container already
30         StateBooting               // instance is booting
31         StateIdle                  // instance booted, no containers are running
32         StateRunning               // instance is running one or more containers
33         StateShutdown              // worker has stopped monitoring the instance
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42 }
43
44 // String implements fmt.Stringer.
45 func (s State) String() string {
46         return stateString[s]
47 }
48
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52         return []byte(stateString[s]), nil
53 }
54
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
57
58 const (
59         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
60         IdleBehaviorHold               = "hold"  // don't shutdown or run more containers
61         IdleBehaviorDrain              = "drain" // shutdown immediately when idle
62 )
63
64 var validIdleBehavior = map[IdleBehavior]bool{
65         IdleBehaviorRun:   true,
66         IdleBehaviorHold:  true,
67         IdleBehaviorDrain: true,
68 }
69
70 type worker struct {
71         logger   logrus.FieldLogger
72         executor Executor
73         wp       *Pool
74
75         mtx          sync.Locker // must be wp's Locker.
76         state        State
77         idleBehavior IdleBehavior
78         instance     cloud.Instance
79         instType     arvados.InstanceType
80         vcpus        int64
81         memory       int64
82         appeared     time.Time
83         probed       time.Time
84         updated      time.Time
85         busy         time.Time
86         destroyed    time.Time
87         lastUUID     string
88         running      map[string]struct{} // remember to update state idle<->running when this changes
89         starting     map[string]struct{} // remember to update state idle<->running when this changes
90         probing      chan struct{}
91 }
92
93 // caller must have lock.
94 func (wkr *worker) startContainer(ctr arvados.Container) {
95         logger := wkr.logger.WithFields(logrus.Fields{
96                 "ContainerUUID": ctr.UUID,
97                 "Priority":      ctr.Priority,
98         })
99         logger = logger.WithField("Instance", wkr.instance)
100         logger.Debug("starting container")
101         wkr.starting[ctr.UUID] = struct{}{}
102         wkr.state = StateRunning
103         go func() {
104                 env := map[string]string{
105                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
106                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
107                 }
108                 stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
109                 wkr.mtx.Lock()
110                 defer wkr.mtx.Unlock()
111                 now := time.Now()
112                 wkr.updated = now
113                 wkr.busy = now
114                 delete(wkr.starting, ctr.UUID)
115                 wkr.running[ctr.UUID] = struct{}{}
116                 wkr.lastUUID = ctr.UUID
117                 if err != nil {
118                         logger.WithField("stdout", string(stdout)).
119                                 WithField("stderr", string(stderr)).
120                                 WithError(err).
121                                 Error("error starting crunch-run process")
122                         // Leave uuid in wkr.running, though: it's
123                         // possible the error was just a communication
124                         // failure and the process was in fact
125                         // started.  Wait for next probe to find out.
126                         return
127                 }
128                 logger.Info("crunch-run process started")
129                 wkr.lastUUID = ctr.UUID
130         }()
131 }
132
133 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
134 // for the worker's curent state. If a previous probe is still
135 // running, it does nothing.
136 //
137 // It should be called in a new goroutine.
138 func (wkr *worker) ProbeAndUpdate() {
139         select {
140         case wkr.probing <- struct{}{}:
141                 wkr.probeAndUpdate()
142                 <-wkr.probing
143         default:
144                 wkr.logger.Debug("still waiting for last probe to finish")
145         }
146 }
147
148 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
149 // updates state accordingly.
150 //
151 // In StateUnknown: Call both probeBooted and probeRunning.
152 // In StateBooting: Call probeBooted; if successful, call probeRunning.
153 // In StateRunning: Call probeRunning.
154 // In StateIdle: Call probeRunning.
155 // In StateShutdown: Do nothing.
156 //
157 // If both probes succeed, wkr.state changes to
158 // StateIdle/StateRunning.
159 //
160 // If probeRunning succeeds, wkr.running is updated. (This means
161 // wkr.running might be non-empty even in StateUnknown, if the boot
162 // probe failed.)
163 //
164 // probeAndUpdate should be called in a new goroutine.
165 func (wkr *worker) probeAndUpdate() {
166         wkr.mtx.Lock()
167         updated := wkr.updated
168         initialState := wkr.state
169         wkr.mtx.Unlock()
170
171         var (
172                 booted   bool
173                 ctrUUIDs []string
174                 ok       bool
175                 stderr   []byte
176         )
177
178         switch initialState {
179         case StateShutdown:
180                 return
181         case StateIdle, StateRunning:
182                 booted = true
183         case StateUnknown, StateBooting:
184         default:
185                 panic(fmt.Sprintf("unknown state %s", initialState))
186         }
187
188         if !booted {
189                 booted, stderr = wkr.probeBooted()
190                 if !booted {
191                         // Pretend this probe succeeded if another
192                         // concurrent attempt succeeded.
193                         wkr.mtx.Lock()
194                         booted = wkr.state == StateRunning || wkr.state == StateIdle
195                         wkr.mtx.Unlock()
196                 } else {
197                         wkr.logger.Info("instance booted; will try probeRunning")
198                 }
199         }
200         if booted || wkr.state == StateUnknown {
201                 ctrUUIDs, ok, stderr = wkr.probeRunning()
202         }
203         logger := wkr.logger.WithField("stderr", string(stderr))
204         wkr.mtx.Lock()
205         defer wkr.mtx.Unlock()
206         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
207                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
208                         // Skip the logging noise if shutdown was
209                         // initiated during probe.
210                         return
211                 }
212                 dur := time.Since(wkr.probed)
213                 logger := logger.WithFields(logrus.Fields{
214                         "Duration": dur,
215                         "State":    wkr.state,
216                 })
217                 if !booted {
218                         // While we're polling the VM to see if it's
219                         // finished booting, failures are not
220                         // noteworthy, so we log at Debug level.
221                         logger.Debug("new instance not responding")
222                 } else {
223                         logger.Info("instance not responding")
224                 }
225                 wkr.shutdownIfBroken(dur)
226                 return
227         }
228
229         updateTime := time.Now()
230         wkr.probed = updateTime
231
232         if updated != wkr.updated {
233                 // Worker was updated after the probe began, so
234                 // wkr.running might have a container UUID that was
235                 // not yet running when ctrUUIDs was generated. Leave
236                 // wkr.running alone and wait for the next probe to
237                 // catch up on any changes.
238                 return
239         }
240
241         if len(ctrUUIDs) > 0 {
242                 wkr.busy = updateTime
243                 wkr.lastUUID = ctrUUIDs[0]
244         } else if len(wkr.running) > 0 {
245                 // Actual last-busy time was sometime between wkr.busy
246                 // and now. Now is the earliest opportunity to take
247                 // advantage of the non-busy state, though.
248                 wkr.busy = updateTime
249         }
250         running := map[string]struct{}{}
251         changed := false
252         for _, uuid := range ctrUUIDs {
253                 running[uuid] = struct{}{}
254                 if _, ok := wkr.running[uuid]; !ok {
255                         changed = true
256                 }
257         }
258         for uuid := range wkr.running {
259                 if _, ok := running[uuid]; !ok {
260                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
261                         wkr.wp.notifyExited(uuid, updateTime)
262                         changed = true
263                 }
264         }
265         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
266                 // Note: this will change again below if
267                 // len(wkr.starting)+len(wkr.running) > 0.
268                 wkr.state = StateIdle
269                 changed = true
270         } else if wkr.state == StateUnknown && len(running) != len(wkr.running) {
271                 logger.WithFields(logrus.Fields{
272                         "RunningContainers": len(running),
273                         "State":             wkr.state,
274                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
275         }
276         if !changed {
277                 return
278         }
279
280         wkr.running = running
281         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
282                 wkr.state = StateRunning
283         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
284                 wkr.state = StateIdle
285         }
286         wkr.updated = updateTime
287         if booted && (initialState == StateUnknown || initialState == StateBooting) {
288                 logger.WithFields(logrus.Fields{
289                         "RunningContainers": len(running),
290                         "State":             wkr.state,
291                 }).Info("probes succeeded, instance is in service")
292         }
293         go wkr.wp.notify()
294 }
295
296 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
297         cmd := "crunch-run --list"
298         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
299         if err != nil {
300                 wkr.logger.WithFields(logrus.Fields{
301                         "Command": cmd,
302                         "stdout":  string(stdout),
303                         "stderr":  string(stderr),
304                 }).WithError(err).Warn("probe failed")
305                 return nil, false, stderr
306         }
307         stdout = bytes.TrimRight(stdout, "\n")
308         if len(stdout) == 0 {
309                 return nil, true, stderr
310         }
311         return strings.Split(string(stdout), "\n"), true, stderr
312 }
313
314 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
315         cmd := wkr.wp.bootProbeCommand
316         if cmd == "" {
317                 cmd = "true"
318         }
319         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
320         logger := wkr.logger.WithFields(logrus.Fields{
321                 "Command": cmd,
322                 "stdout":  string(stdout),
323                 "stderr":  string(stderr),
324         })
325         if err != nil {
326                 logger.WithError(err).Debug("boot probe failed")
327                 return false, stderr
328         }
329         logger.Info("boot probe succeeded")
330         return true, stderr
331 }
332
333 // caller must have lock.
334 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
335         if wkr.idleBehavior == IdleBehaviorHold {
336                 return
337         }
338         label, threshold := "", wkr.wp.timeoutProbe
339         if wkr.state == StateUnknown || wkr.state == StateBooting {
340                 label, threshold = "new ", wkr.wp.timeoutBooting
341         }
342         if dur < threshold {
343                 return
344         }
345         wkr.logger.WithFields(logrus.Fields{
346                 "Duration": dur,
347                 "Since":    wkr.probed,
348                 "State":    wkr.state,
349         }).Warnf("%sinstance unresponsive, shutting down", label)
350         wkr.shutdown()
351 }
352
353 // caller must have lock.
354 func (wkr *worker) shutdownIfIdle() bool {
355         if wkr.idleBehavior == IdleBehaviorHold {
356                 return false
357         }
358         if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
359                 return false
360         }
361         age := time.Since(wkr.busy)
362         if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
363                 return false
364         }
365         wkr.logger.WithFields(logrus.Fields{
366                 "Age":          age,
367                 "IdleBehavior": wkr.idleBehavior,
368         }).Info("shutdown idle worker")
369         wkr.shutdown()
370         return true
371 }
372
373 // caller must have lock.
374 func (wkr *worker) shutdown() {
375         now := time.Now()
376         wkr.updated = now
377         wkr.destroyed = now
378         wkr.state = StateShutdown
379         go wkr.wp.notify()
380         go func() {
381                 err := wkr.instance.Destroy()
382                 if err != nil {
383                         wkr.logger.WithError(err).Warn("shutdown failed")
384                         return
385                 }
386         }()
387 }
388
389 // Save worker tags to cloud provider metadata, if they don't already
390 // match. Caller must have lock.
391 func (wkr *worker) saveTags() {
392         instance := wkr.instance
393         have := instance.Tags()
394         want := cloud.InstanceTags{
395                 tagKeyInstanceType: wkr.instType.Name,
396                 tagKeyIdleBehavior: string(wkr.idleBehavior),
397         }
398         go func() {
399                 for k, v := range want {
400                         if v == have[k] {
401                                 continue
402                         }
403                         err := instance.SetTags(want)
404                         if err != nil {
405                                 wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
406                         }
407                         break
408
409                 }
410         }()
411 }