14807: Load API host/token directly from stdin without shell hack.
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/sirupsen/logrus"
18 )
19
20 const (
21         // TODO: configurable
22         maxPingFailTime = 10 * time.Minute
23 )
24
25 // State indicates whether a worker is available to do work, and (if
26 // not) whether/when it is expected to become ready.
27 type State int
28
29 const (
30         StateUnknown  State = iota // might be running a container already
31         StateBooting               // instance is booting
32         StateIdle                  // instance booted, no containers are running
33         StateRunning               // instance is running one or more containers
34         StateShutdown              // worker has stopped monitoring the instance
35 )
36
37 var stateString = map[State]string{
38         StateUnknown:  "unknown",
39         StateBooting:  "booting",
40         StateIdle:     "idle",
41         StateRunning:  "running",
42         StateShutdown: "shutdown",
43 }
44
45 // String implements fmt.Stringer.
46 func (s State) String() string {
47         return stateString[s]
48 }
49
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53         return []byte(stateString[s]), nil
54 }
55
56 // IdleBehavior indicates the behavior desired when a node becomes idle.
57 type IdleBehavior string
58
59 const (
60         IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
61         IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
62         IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
63 )
64
65 var validIdleBehavior = map[IdleBehavior]bool{
66         IdleBehaviorRun:   true,
67         IdleBehaviorHold:  true,
68         IdleBehaviorDrain: true,
69 }
70
71 type worker struct {
72         logger   logrus.FieldLogger
73         executor Executor
74         wp       *Pool
75
76         mtx          sync.Locker // must be wp's Locker.
77         state        State
78         idleBehavior IdleBehavior
79         instance     cloud.Instance
80         instType     arvados.InstanceType
81         vcpus        int64
82         memory       int64
83         appeared     time.Time
84         probed       time.Time
85         updated      time.Time
86         busy         time.Time
87         destroyed    time.Time
88         lastUUID     string
89         running      map[string]struct{} // remember to update state idle<->running when this changes
90         starting     map[string]struct{} // remember to update state idle<->running when this changes
91         probing      chan struct{}
92 }
93
94 // caller must have lock.
95 func (wkr *worker) startContainer(ctr arvados.Container) {
96         logger := wkr.logger.WithFields(logrus.Fields{
97                 "ContainerUUID": ctr.UUID,
98                 "Priority":      ctr.Priority,
99         })
100         logger = logger.WithField("Instance", wkr.instance.ID())
101         logger.Debug("starting container")
102         wkr.starting[ctr.UUID] = struct{}{}
103         wkr.state = StateRunning
104         go func() {
105                 env := map[string]string{
106                         "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
107                         "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
108                 }
109                 if wkr.wp.arvClient.Insecure {
110                         env["ARVADOS_API_HOST_INSECURE"] = "1"
111                 }
112                 envJSON, err := json.Marshal(env)
113                 if err != nil {
114                         panic(err)
115                 }
116                 stdin := bytes.NewBuffer(envJSON)
117                 cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
118                 if u := wkr.instance.RemoteUser(); u != "root" {
119                         cmd = "sudo " + cmd
120                 }
121                 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
122                 wkr.mtx.Lock()
123                 defer wkr.mtx.Unlock()
124                 now := time.Now()
125                 wkr.updated = now
126                 wkr.busy = now
127                 delete(wkr.starting, ctr.UUID)
128                 wkr.running[ctr.UUID] = struct{}{}
129                 wkr.lastUUID = ctr.UUID
130                 if err != nil {
131                         logger.WithField("stdout", string(stdout)).
132                                 WithField("stderr", string(stderr)).
133                                 WithError(err).
134                                 Error("error starting crunch-run process")
135                         // Leave uuid in wkr.running, though: it's
136                         // possible the error was just a communication
137                         // failure and the process was in fact
138                         // started.  Wait for next probe to find out.
139                         return
140                 }
141                 logger.Info("crunch-run process started")
142                 wkr.lastUUID = ctr.UUID
143         }()
144 }
145
146 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
147 // for the worker's curent state. If a previous probe is still
148 // running, it does nothing.
149 //
150 // It should be called in a new goroutine.
151 func (wkr *worker) ProbeAndUpdate() {
152         select {
153         case wkr.probing <- struct{}{}:
154                 wkr.probeAndUpdate()
155                 <-wkr.probing
156         default:
157                 wkr.logger.Debug("still waiting for last probe to finish")
158         }
159 }
160
161 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
162 // updates state accordingly.
163 //
164 // In StateUnknown: Call both probeBooted and probeRunning.
165 // In StateBooting: Call probeBooted; if successful, call probeRunning.
166 // In StateRunning: Call probeRunning.
167 // In StateIdle: Call probeRunning.
168 // In StateShutdown: Do nothing.
169 //
170 // If both probes succeed, wkr.state changes to
171 // StateIdle/StateRunning.
172 //
173 // If probeRunning succeeds, wkr.running is updated. (This means
174 // wkr.running might be non-empty even in StateUnknown, if the boot
175 // probe failed.)
176 //
177 // probeAndUpdate should be called in a new goroutine.
178 func (wkr *worker) probeAndUpdate() {
179         wkr.mtx.Lock()
180         updated := wkr.updated
181         initialState := wkr.state
182         wkr.mtx.Unlock()
183
184         var (
185                 booted   bool
186                 ctrUUIDs []string
187                 ok       bool
188                 stderr   []byte // from probeBooted
189         )
190
191         switch initialState {
192         case StateShutdown:
193                 return
194         case StateIdle, StateRunning:
195                 booted = true
196         case StateUnknown, StateBooting:
197         default:
198                 panic(fmt.Sprintf("unknown state %s", initialState))
199         }
200
201         probeStart := time.Now()
202         logger := wkr.logger.WithField("ProbeStart", probeStart)
203
204         if !booted {
205                 booted, stderr = wkr.probeBooted()
206                 if !booted {
207                         // Pretend this probe succeeded if another
208                         // concurrent attempt succeeded.
209                         wkr.mtx.Lock()
210                         booted = wkr.state == StateRunning || wkr.state == StateIdle
211                         wkr.mtx.Unlock()
212                 }
213                 if booted {
214                         logger.Info("instance booted; will try probeRunning")
215                 }
216         }
217         if booted || wkr.state == StateUnknown {
218                 ctrUUIDs, ok = wkr.probeRunning()
219         }
220         wkr.mtx.Lock()
221         defer wkr.mtx.Unlock()
222         if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
223                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
224                         // Skip the logging noise if shutdown was
225                         // initiated during probe.
226                         return
227                 }
228                 // Using the start time of the probe as the timeout
229                 // threshold ensures we always initiate at least one
230                 // probe attempt after the boot/probe timeout expires
231                 // (otherwise, a slow probe failure could cause us to
232                 // shutdown an instance even though it did in fact
233                 // boot/recover before the timeout expired).
234                 dur := probeStart.Sub(wkr.probed)
235                 if wkr.shutdownIfBroken(dur) {
236                         // stderr from failed run-probes will have
237                         // been logged already, but boot-probe
238                         // failures are normal so they are logged only
239                         // at Debug level. This is our chance to log
240                         // some evidence about why the node never
241                         // booted, even in non-debug mode.
242                         if !booted {
243                                 logger.WithFields(logrus.Fields{
244                                         "Duration": dur,
245                                         "stderr":   string(stderr),
246                                 }).Info("boot failed")
247                         }
248                 }
249                 return
250         }
251
252         updateTime := time.Now()
253         wkr.probed = updateTime
254
255         if updated != wkr.updated {
256                 // Worker was updated after the probe began, so
257                 // wkr.running might have a container UUID that was
258                 // not yet running when ctrUUIDs was generated. Leave
259                 // wkr.running alone and wait for the next probe to
260                 // catch up on any changes.
261                 return
262         }
263
264         if len(ctrUUIDs) > 0 {
265                 wkr.busy = updateTime
266                 wkr.lastUUID = ctrUUIDs[0]
267         } else if len(wkr.running) > 0 {
268                 // Actual last-busy time was sometime between wkr.busy
269                 // and now. Now is the earliest opportunity to take
270                 // advantage of the non-busy state, though.
271                 wkr.busy = updateTime
272         }
273         changed := false
274
275         // Build a new "running" map. Set changed=true if it differs
276         // from the existing map (wkr.running) to ensure the scheduler
277         // gets notified below.
278         running := map[string]struct{}{}
279         for _, uuid := range ctrUUIDs {
280                 running[uuid] = struct{}{}
281                 if _, ok := wkr.running[uuid]; !ok {
282                         if _, ok := wkr.starting[uuid]; !ok {
283                                 // We didn't start it -- it must have
284                                 // been started by a previous
285                                 // dispatcher process.
286                                 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
287                         }
288                         changed = true
289                 }
290         }
291         for uuid := range wkr.running {
292                 if _, ok := running[uuid]; !ok {
293                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
294                         wkr.wp.notifyExited(uuid, updateTime)
295                         changed = true
296                 }
297         }
298
299         // Update state if this was the first successful boot-probe.
300         if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
301                 // Note: this will change again below if
302                 // len(wkr.starting)+len(wkr.running) > 0.
303                 wkr.state = StateIdle
304                 changed = true
305         }
306
307         // If wkr.state and wkr.running aren't changing then there's
308         // no need to log anything, notify the scheduler, move state
309         // back and forth between idle/running, etc.
310         if !changed {
311                 return
312         }
313
314         // Log whenever a run-probe reveals crunch-run processes
315         // appearing/disappearing before boot-probe succeeds.
316         if wkr.state == StateUnknown && len(running) != len(wkr.running) {
317                 logger.WithFields(logrus.Fields{
318                         "RunningContainers": len(running),
319                         "State":             wkr.state,
320                 }).Info("crunch-run probe succeeded, but boot probe is still failing")
321         }
322
323         wkr.running = running
324         if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
325                 wkr.state = StateRunning
326         } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
327                 wkr.state = StateIdle
328         }
329         wkr.updated = updateTime
330         if booted && (initialState == StateUnknown || initialState == StateBooting) {
331                 logger.WithFields(logrus.Fields{
332                         "RunningContainers": len(running),
333                         "State":             wkr.state,
334                 }).Info("probes succeeded, instance is in service")
335         }
336         go wkr.wp.notify()
337 }
338
339 func (wkr *worker) probeRunning() (running []string, ok bool) {
340         cmd := "crunch-run --list"
341         if u := wkr.instance.RemoteUser(); u != "root" {
342                 cmd = "sudo " + cmd
343         }
344         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
345         if err != nil {
346                 wkr.logger.WithFields(logrus.Fields{
347                         "Command": cmd,
348                         "stdout":  string(stdout),
349                         "stderr":  string(stderr),
350                 }).WithError(err).Warn("probe failed")
351                 return nil, false
352         }
353         stdout = bytes.TrimRight(stdout, "\n")
354         if len(stdout) == 0 {
355                 return nil, true
356         }
357         return strings.Split(string(stdout), "\n"), true
358 }
359
360 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
361         cmd := wkr.wp.bootProbeCommand
362         if cmd == "" {
363                 cmd = "true"
364         }
365         stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
366         logger := wkr.logger.WithFields(logrus.Fields{
367                 "Command": cmd,
368                 "stdout":  string(stdout),
369                 "stderr":  string(stderr),
370         })
371         if err != nil {
372                 logger.WithError(err).Debug("boot probe failed")
373                 return false, stderr
374         }
375         logger.Info("boot probe succeeded")
376         return true, stderr
377 }
378
379 // caller must have lock.
380 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
381         if wkr.idleBehavior == IdleBehaviorHold {
382                 // Never shut down.
383                 return false
384         }
385         label, threshold := "", wkr.wp.timeoutProbe
386         if wkr.state == StateUnknown || wkr.state == StateBooting {
387                 label, threshold = "new ", wkr.wp.timeoutBooting
388         }
389         if dur < threshold {
390                 return false
391         }
392         wkr.logger.WithFields(logrus.Fields{
393                 "Duration": dur,
394                 "Since":    wkr.probed,
395                 "State":    wkr.state,
396         }).Warnf("%sinstance unresponsive, shutting down", label)
397         wkr.shutdown()
398         return true
399 }
400
401 // caller must have lock.
402 func (wkr *worker) shutdownIfIdle() bool {
403         if wkr.idleBehavior == IdleBehaviorHold {
404                 // Never shut down.
405                 return false
406         }
407         age := time.Since(wkr.busy)
408
409         old := age >= wkr.wp.timeoutIdle
410         draining := wkr.idleBehavior == IdleBehaviorDrain
411         shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
412                 (draining && wkr.state == StateBooting)
413         if !shouldShutdown {
414                 return false
415         }
416
417         wkr.logger.WithFields(logrus.Fields{
418                 "State":        wkr.state,
419                 "Age":          age,
420                 "IdleBehavior": wkr.idleBehavior,
421         }).Info("shutdown idle worker")
422         wkr.shutdown()
423         return true
424 }
425
426 // caller must have lock.
427 func (wkr *worker) shutdown() {
428         now := time.Now()
429         wkr.updated = now
430         wkr.destroyed = now
431         wkr.state = StateShutdown
432         go wkr.wp.notify()
433         go func() {
434                 err := wkr.instance.Destroy()
435                 if err != nil {
436                         wkr.logger.WithError(err).Warn("shutdown failed")
437                         return
438                 }
439         }()
440 }
441
442 // Save worker tags to cloud provider metadata, if they don't already
443 // match. Caller must have lock.
444 func (wkr *worker) saveTags() {
445         instance := wkr.instance
446         have := instance.Tags()
447         want := cloud.InstanceTags{
448                 tagKeyInstanceType: wkr.instType.Name,
449                 tagKeyIdleBehavior: string(wkr.idleBehavior),
450         }
451         go func() {
452                 for k, v := range want {
453                         if v == have[k] {
454                                 continue
455                         }
456                         err := instance.SetTags(want)
457                         if err != nil {
458                                 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
459                         }
460                         break
461
462                 }
463         }()
464 }