14325: Log when a worker's first probe succeeds.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "strings"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/lib/cloud"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/sirupsen/logrus"
16 )
17
18 const (
19         // TODO: configurable
20         maxPingFailTime = 10 * time.Minute
21 )
22
23 // State indicates whether a worker is available to do work, and (if
24 // not) whether/when it is expected to become ready.
25 type State int
26
27 const (
28         StateUnknown  State = iota // might be running a container already
29         StateBooting               // instance is booting
30         StateIdle                  // instance booted, no containers are running
31         StateRunning               // instance is running one or more containers
32         StateShutdown              // worker has stopped monitoring the instance
33 )
34
35 var stateString = map[State]string{
36         StateUnknown:  "unknown",
37         StateBooting:  "booting",
38         StateIdle:     "idle",
39         StateRunning:  "running",
40         StateShutdown: "shutdown",
41 }
42
43 // String implements fmt.Stringer.
44 func (s State) String() string {
45         return stateString[s]
46 }
47
48 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
49 // map[State]anything uses the state's string representation.
50 func (s State) MarshalText() ([]byte, error) {
51         return []byte(stateString[s]), nil
52 }
53
54 // IdleBehavior indicates the behavior desired when a node becomes idle.
55 type IdleBehavior string
56
57 const (
58         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
59         IdleBehaviorHold               = "hold"  // don't shutdown or run more containers
60         IdleBehaviorDrain              = "drain" // shutdown immediately when idle
61 )
62
63 var validIdleBehavior = map[IdleBehavior]bool{
64         IdleBehaviorRun:   true,
65         IdleBehaviorHold:  true,
66         IdleBehaviorDrain: true,
67 }
68
69 type worker struct {
70         logger   logrus.FieldLogger
71         executor Executor
72         wp       *Pool
73
74         mtx          sync.Locker // must be wp's Locker.
75         state        State
76         idleBehavior IdleBehavior
77         instance     cloud.Instance
78         instType     arvados.InstanceType
79         vcpus        int64
80         memory       int64
81         appeared     time.Time
82         probed       time.Time
83         updated      time.Time
84         busy         time.Time
85         destroyed    time.Time
86         lastUUID     string
87         running      map[string]struct{} // remember to update state idle<->running when this changes
88         starting     map[string]struct{} // remember to update state idle<->running when this changes
89         probing      chan struct{}
90 }
91
92 // caller must have lock.
93 func (wkr *worker) startContainer(ctr arvados.Container) {
94         logger := wkr.logger.WithFields(logrus.Fields{
95                 "ContainerUUID": ctr.UUID,
96                 "Priority":      ctr.Priority,
97         })
98         logger = logger.WithField("Instance", wkr.instance)
99         logger.Debug("starting container")
100         wkr.starting[ctr.UUID] = struct{}{}
101         wkr.state = StateRunning
102         go func() {
103                 env := map[string]string{
104                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
105                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
106                 }
107                 stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
108                 wkr.mtx.Lock()
109                 defer wkr.mtx.Unlock()
110                 now := time.Now()
111                 wkr.updated = now
112                 wkr.busy = now
113                 delete(wkr.starting, ctr.UUID)
114                 wkr.running[ctr.UUID] = struct{}{}
115                 wkr.lastUUID = ctr.UUID
116                 if err != nil {
117                         logger.WithField("stdout", string(stdout)).
118                                 WithField("stderr", string(stderr)).
119                                 WithError(err).
120                                 Error("error starting crunch-run process")
121                         // Leave uuid in wkr.running, though: it's
122                         // possible the error was just a communication
123                         // failure and the process was in fact
124                         // started.  Wait for next probe to find out.
125                         return
126                 }
127                 logger.Info("crunch-run process started")
128                 wkr.lastUUID = ctr.UUID
129         }()
130 }
131
132 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
133 // for the worker's curent state. If a previous probe is still
134 // running, it does nothing.
135 //
136 // It should be called in a new goroutine.
137 func (wkr *worker) ProbeAndUpdate() {
138         select {
139         case wkr.probing <- struct{}{}:
140                 wkr.probeAndUpdate()
141                 <-wkr.probing
142         default:
143                 wkr.logger.Debug("still waiting for last probe to finish")
144         }
145 }
146
147 // should be called in a new goroutine
148 func (wkr *worker) probeAndUpdate() {
149         wkr.mtx.Lock()
150         updated := wkr.updated
151         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
152         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
153         wkr.mtx.Unlock()
154         if !needProbeBooted && !needProbeRunning {
155                 return
156         }
157
158         var (
159                 ctrUUIDs []string
160                 ok       bool
161                 stderr   []byte
162         )
163         if needProbeBooted {
164                 ok, stderr = wkr.probeBooted()
165                 wkr.mtx.Lock()
166                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
167                         wkr.logger.Info("instance booted; will try probeRunning")
168                         needProbeRunning = true
169                 }
170                 wkr.mtx.Unlock()
171         }
172         if needProbeRunning {
173                 ctrUUIDs, ok, stderr = wkr.probeRunning()
174         }
175         logger := wkr.logger.WithField("stderr", string(stderr))
176         wkr.mtx.Lock()
177         defer wkr.mtx.Unlock()
178         if !ok {
179                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
180                         // Skip the logging noise if shutdown was
181                         // initiated during probe.
182                         return
183                 }
184                 dur := time.Since(wkr.probed)
185                 logger := logger.WithFields(logrus.Fields{
186                         "Duration": dur,
187                         "State":    wkr.state,
188                 })
189                 if wkr.state == StateBooting && !needProbeRunning {
190                         // If we know the instance has never passed a
191                         // boot probe, it's not noteworthy that it
192                         // hasn't passed this probe.
193                         logger.Debug("new instance not responding")
194                 } else {
195                         logger.Info("instance not responding")
196                 }
197                 wkr.shutdownIfBroken(dur)
198                 return
199         }
200
201         updateTime := time.Now()
202         wkr.probed = updateTime
203
204         if updated != wkr.updated {
205                 // Worker was updated after the probe began, so
206                 // wkr.running might have a container UUID that was
207                 // not yet running when ctrUUIDs was generated. Leave
208                 // wkr.running alone and wait for the next probe to
209                 // catch up on any changes.
210                 return
211         }
212
213         if len(ctrUUIDs) > 0 {
214                 wkr.busy = updateTime
215                 wkr.lastUUID = ctrUUIDs[0]
216         } else if len(wkr.running) > 0 {
217                 // Actual last-busy time was sometime between wkr.busy
218                 // and now. Now is the earliest opportunity to take
219                 // advantage of the non-busy state, though.
220                 wkr.busy = updateTime
221         }
222         running := map[string]struct{}{}
223         changed := false
224         for _, uuid := range ctrUUIDs {
225                 running[uuid] = struct{}{}
226                 if _, ok := wkr.running[uuid]; !ok {
227                         changed = true
228                 }
229         }
230         for uuid := range wkr.running {
231                 if _, ok := running[uuid]; !ok {
232                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
233                         wkr.wp.notifyExited(uuid, updateTime)
234                         changed = true
235                 }
236         }
237         if wkr.state == StateUnknown || wkr.state == StateBooting {
238                 // Note: this will change again below if
239                 // len(wkr.starting)+len(wkr.running) > 0.
240                 wkr.state = StateIdle
241                 changed = true
242         }
243         if !changed {
244                 return
245         }
246
247         wkr.running = running
248         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
249                 wkr.state = StateRunning
250         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
251                 wkr.state = StateIdle
252         }
253         wkr.updated = updateTime
254         if needProbeBooted {
255                 logger.WithFields(logrus.Fields{
256                         "RunningContainers": len(running),
257                         "State":             wkr.state,
258                 }).Info("probes succeeded, instance is in service")
259         }
260         go wkr.wp.notify()
261 }
262
263 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
264         cmd := "crunch-run --list"
265         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
266         if err != nil {
267                 wkr.logger.WithFields(logrus.Fields{
268                         "Command": cmd,
269                         "stdout":  string(stdout),
270                         "stderr":  string(stderr),
271                 }).WithError(err).Warn("probe failed")
272                 return nil, false, stderr
273         }
274         stdout = bytes.TrimRight(stdout, "\n")
275         if len(stdout) == 0 {
276                 return nil, true, stderr
277         }
278         return strings.Split(string(stdout), "\n"), true, stderr
279 }
280
281 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
282         cmd := wkr.wp.bootProbeCommand
283         if cmd == "" {
284                 cmd = "true"
285         }
286         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
287         logger := wkr.logger.WithFields(logrus.Fields{
288                 "Command": cmd,
289                 "stdout":  string(stdout),
290                 "stderr":  string(stderr),
291         })
292         if err != nil {
293                 logger.WithError(err).Debug("boot probe failed")
294                 return false, stderr
295         }
296         logger.Info("boot probe succeeded")
297         return true, stderr
298 }
299
300 // caller must have lock.
301 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
302         if wkr.idleBehavior == IdleBehaviorHold {
303                 return
304         }
305         label, threshold := "", wkr.wp.timeoutProbe
306         if wkr.state == StateUnknown || wkr.state == StateBooting {
307                 label, threshold = "new ", wkr.wp.timeoutBooting
308         }
309         if dur < threshold {
310                 return
311         }
312         wkr.logger.WithFields(logrus.Fields{
313                 "Duration": dur,
314                 "Since":    wkr.probed,
315                 "State":    wkr.state,
316         }).Warnf("%sinstance unresponsive, shutting down", label)
317         wkr.shutdown()
318 }
319
320 // caller must have lock.
321 func (wkr *worker) shutdownIfIdle() bool {
322         if wkr.idleBehavior == IdleBehaviorHold {
323                 return false
324         }
325         if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
326                 return false
327         }
328         age := time.Since(wkr.busy)
329         if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
330                 return false
331         }
332         wkr.logger.WithFields(logrus.Fields{
333                 "Age":          age,
334                 "IdleBehavior": wkr.idleBehavior,
335         }).Info("shutdown idle worker")
336         wkr.shutdown()
337         return true
338 }
339
340 // caller must have lock.
341 func (wkr *worker) shutdown() {
342         now := time.Now()
343         wkr.updated = now
344         wkr.destroyed = now
345         wkr.state = StateShutdown
346         go wkr.wp.notify()
347         go func() {
348                 err := wkr.instance.Destroy()
349                 if err != nil {
350                         wkr.logger.WithError(err).Warn("shutdown failed")
351                         return
352                 }
353         }()
354 }
355
356 // Save worker tags to cloud provider metadata, if they don't already
357 // match. Caller must have lock.
358 func (wkr *worker) saveTags() {
359         instance := wkr.instance
360         have := instance.Tags()
361         want := cloud.InstanceTags{
362                 tagKeyInstanceType: wkr.instType.Name,
363                 tagKeyIdleBehavior: string(wkr.idleBehavior),
364         }
365         go func() {
366                 for k, v := range want {
367                         if v == have[k] {
368                                 continue
369                         }
370                         err := instance.SetTags(want)
371                         if err != nil {
372                                 wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
373                         }
374                         break
375
376                 }
377         }()
378 }