14325: Shutdown unknown-state worker after boot timeout.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "strings"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/lib/cloud"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/sirupsen/logrus"
16 )
17
18 const (
19         // TODO: configurable
20         maxPingFailTime = 10 * time.Minute
21 )
22
23 // State indicates whether a worker is available to do work, and (if
24 // not) whether/when it is expected to become ready.
25 type State int
26
27 const (
28         StateUnknown  State = iota // might be running a container already
29         StateBooting               // instance is booting
30         StateIdle                  // instance booted, no containers are running
31         StateRunning               // instance is running one or more containers
32         StateShutdown              // worker has stopped monitoring the instance
33 )
34
35 var stateString = map[State]string{
36         StateUnknown:  "unknown",
37         StateBooting:  "booting",
38         StateIdle:     "idle",
39         StateRunning:  "running",
40         StateShutdown: "shutdown",
41 }
42
43 // String implements fmt.Stringer.
44 func (s State) String() string {
45         return stateString[s]
46 }
47
48 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
49 // map[State]anything uses the state's string representation.
50 func (s State) MarshalText() ([]byte, error) {
51         return []byte(stateString[s]), nil
52 }
53
54 // IdleBehavior indicates the behavior desired when a node becomes idle.
55 type IdleBehavior string
56
57 const (
58         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
59         IdleBehaviorHold               = "hold"  // don't shutdown or run more containers
60         IdleBehaviorDrain              = "drain" // shutdown immediately when idle
61 )
62
63 var validIdleBehavior = map[IdleBehavior]bool{
64         IdleBehaviorRun:   true,
65         IdleBehaviorHold:  true,
66         IdleBehaviorDrain: true,
67 }
68
69 type worker struct {
70         logger   logrus.FieldLogger
71         executor Executor
72         wp       *Pool
73
74         mtx          sync.Locker // must be wp's Locker.
75         state        State
76         idleBehavior IdleBehavior
77         instance     cloud.Instance
78         instType     arvados.InstanceType
79         vcpus        int64
80         memory       int64
81         appeared     time.Time
82         probed       time.Time
83         updated      time.Time
84         busy         time.Time
85         destroyed    time.Time
86         lastUUID     string
87         running      map[string]struct{} // remember to update state idle<->running when this changes
88         starting     map[string]struct{} // remember to update state idle<->running when this changes
89         probing      chan struct{}
90 }
91
92 // caller must have lock.
93 func (wkr *worker) startContainer(ctr arvados.Container) {
94         logger := wkr.logger.WithFields(logrus.Fields{
95                 "ContainerUUID": ctr.UUID,
96                 "Priority":      ctr.Priority,
97         })
98         logger = logger.WithField("Instance", wkr.instance)
99         logger.Debug("starting container")
100         wkr.starting[ctr.UUID] = struct{}{}
101         wkr.state = StateRunning
102         go func() {
103                 env := map[string]string{
104                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
105                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
106                 }
107                 stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
108                 wkr.mtx.Lock()
109                 defer wkr.mtx.Unlock()
110                 now := time.Now()
111                 wkr.updated = now
112                 wkr.busy = now
113                 delete(wkr.starting, ctr.UUID)
114                 wkr.running[ctr.UUID] = struct{}{}
115                 wkr.lastUUID = ctr.UUID
116                 if err != nil {
117                         logger.WithField("stdout", string(stdout)).
118                                 WithField("stderr", string(stderr)).
119                                 WithError(err).
120                                 Error("error starting crunch-run process")
121                         // Leave uuid in wkr.running, though: it's
122                         // possible the error was just a communication
123                         // failure and the process was in fact
124                         // started.  Wait for next probe to find out.
125                         return
126                 }
127                 logger.Info("crunch-run process started")
128                 wkr.lastUUID = ctr.UUID
129         }()
130 }
131
132 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
133 // for the worker's curent state. If a previous probe is still
134 // running, it does nothing.
135 //
136 // It should be called in a new goroutine.
137 func (wkr *worker) ProbeAndUpdate() {
138         select {
139         case wkr.probing <- struct{}{}:
140                 wkr.probeAndUpdate()
141                 <-wkr.probing
142         default:
143                 wkr.logger.Debug("still waiting for last probe to finish")
144         }
145 }
146
147 // should be called in a new goroutine
148 func (wkr *worker) probeAndUpdate() {
149         wkr.mtx.Lock()
150         updated := wkr.updated
151         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
152         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
153         wkr.mtx.Unlock()
154         if !needProbeBooted && !needProbeRunning {
155                 return
156         }
157
158         var (
159                 ctrUUIDs []string
160                 ok       bool
161                 stderr   []byte
162         )
163         if needProbeBooted {
164                 ok, stderr = wkr.probeBooted()
165                 wkr.mtx.Lock()
166                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
167                         wkr.logger.Info("instance booted; will try probeRunning")
168                         needProbeRunning = true
169                 }
170                 wkr.mtx.Unlock()
171         }
172         if needProbeRunning {
173                 ctrUUIDs, ok, stderr = wkr.probeRunning()
174         }
175         logger := wkr.logger.WithField("stderr", string(stderr))
176         wkr.mtx.Lock()
177         defer wkr.mtx.Unlock()
178         if !ok {
179                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
180                         // Skip the logging noise if shutdown was
181                         // initiated during probe.
182                         return
183                 }
184                 dur := time.Since(wkr.probed)
185                 logger := logger.WithFields(logrus.Fields{
186                         "Duration": dur,
187                         "State":    wkr.state,
188                 })
189                 if wkr.state == StateBooting && !needProbeRunning {
190                         // If we know the instance has never passed a
191                         // boot probe, it's not noteworthy that it
192                         // hasn't passed this probe.
193                         logger.Debug("new instance not responding")
194                 } else {
195                         logger.Info("instance not responding")
196                 }
197                 wkr.shutdownIfBroken(dur)
198                 return
199         }
200
201         updateTime := time.Now()
202         wkr.probed = updateTime
203
204         if updated != wkr.updated {
205                 // Worker was updated after the probe began, so
206                 // wkr.running might have a container UUID that was
207                 // not yet running when ctrUUIDs was generated. Leave
208                 // wkr.running alone and wait for the next probe to
209                 // catch up on any changes.
210                 return
211         }
212
213         if len(ctrUUIDs) > 0 {
214                 wkr.busy = updateTime
215                 wkr.lastUUID = ctrUUIDs[0]
216         } else if len(wkr.running) > 0 {
217                 // Actual last-busy time was sometime between wkr.busy
218                 // and now. Now is the earliest opportunity to take
219                 // advantage of the non-busy state, though.
220                 wkr.busy = updateTime
221         }
222         running := map[string]struct{}{}
223         changed := false
224         for _, uuid := range ctrUUIDs {
225                 running[uuid] = struct{}{}
226                 if _, ok := wkr.running[uuid]; !ok {
227                         changed = true
228                 }
229         }
230         for uuid := range wkr.running {
231                 if _, ok := running[uuid]; !ok {
232                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
233                         wkr.wp.notifyExited(uuid, updateTime)
234                         changed = true
235                 }
236         }
237         if wkr.state == StateUnknown || wkr.state == StateBooting {
238                 wkr.state = StateIdle
239                 changed = true
240         }
241         if changed {
242                 wkr.running = running
243                 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
244                         wkr.state = StateRunning
245                 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
246                         wkr.state = StateIdle
247                 }
248                 wkr.updated = updateTime
249                 go wkr.wp.notify()
250         }
251 }
252
253 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
254         cmd := "crunch-run --list"
255         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
256         if err != nil {
257                 wkr.logger.WithFields(logrus.Fields{
258                         "Command": cmd,
259                         "stdout":  string(stdout),
260                         "stderr":  string(stderr),
261                 }).WithError(err).Warn("probe failed")
262                 return nil, false, stderr
263         }
264         stdout = bytes.TrimRight(stdout, "\n")
265         if len(stdout) == 0 {
266                 return nil, true, stderr
267         }
268         return strings.Split(string(stdout), "\n"), true, stderr
269 }
270
271 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
272         cmd := wkr.wp.bootProbeCommand
273         if cmd == "" {
274                 cmd = "true"
275         }
276         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
277         logger := wkr.logger.WithFields(logrus.Fields{
278                 "Command": cmd,
279                 "stdout":  string(stdout),
280                 "stderr":  string(stderr),
281         })
282         if err != nil {
283                 logger.WithError(err).Debug("boot probe failed")
284                 return false, stderr
285         }
286         logger.Info("boot probe succeeded")
287         return true, stderr
288 }
289
290 // caller must have lock.
291 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
292         if wkr.idleBehavior == IdleBehaviorHold {
293                 return
294         }
295         label, threshold := "", wkr.wp.timeoutProbe
296         if wkr.state == StateUnknown || wkr.state == StateBooting {
297                 label, threshold = "new ", wkr.wp.timeoutBooting
298         }
299         if dur < threshold {
300                 return
301         }
302         wkr.logger.WithFields(logrus.Fields{
303                 "Duration": dur,
304                 "Since":    wkr.probed,
305                 "State":    wkr.state,
306         }).Warnf("%sinstance unresponsive, shutting down", label)
307         wkr.shutdown()
308 }
309
310 // caller must have lock.
311 func (wkr *worker) shutdownIfIdle() bool {
312         if wkr.idleBehavior == IdleBehaviorHold {
313                 return false
314         }
315         if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
316                 return false
317         }
318         age := time.Since(wkr.busy)
319         if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
320                 return false
321         }
322         wkr.logger.WithFields(logrus.Fields{
323                 "Age":          age,
324                 "IdleBehavior": wkr.idleBehavior,
325         }).Info("shutdown idle worker")
326         wkr.shutdown()
327         return true
328 }
329
330 // caller must have lock.
331 func (wkr *worker) shutdown() {
332         now := time.Now()
333         wkr.updated = now
334         wkr.destroyed = now
335         wkr.state = StateShutdown
336         go wkr.wp.notify()
337         go func() {
338                 err := wkr.instance.Destroy()
339                 if err != nil {
340                         wkr.logger.WithError(err).Warn("shutdown failed")
341                         return
342                 }
343         }()
344 }
345
346 // Save worker tags to cloud provider metadata, if they don't already
347 // match. Caller must have lock.
348 func (wkr *worker) saveTags() {
349         instance := wkr.instance
350         have := instance.Tags()
351         want := cloud.InstanceTags{
352                 tagKeyInstanceType: wkr.instType.Name,
353                 tagKeyIdleBehavior: string(wkr.idleBehavior),
354         }
355         go func() {
356                 for k, v := range want {
357                         if v == have[k] {
358                                 continue
359                         }
360                         err := instance.SetTags(want)
361                         if err != nil {
362                                 wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
363                         }
364                         break
365
366                 }
367         }()
368 }