Merge branch 'master' into 14360-dispatch-cloud
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "strings"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/lib/cloud"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/Sirupsen/logrus"
16 )
17
18 // State indicates whether a worker is available to do work, and (if
19 // not) whether/when it is expected to become ready.
20 type State int
21
22 const (
23         StateUnknown  State = iota // might be running a container already
24         StateBooting               // instance is booting
25         StateIdle                  // instance booted, no containers are running
26         StateRunning               // instance is running one or more containers
27         StateShutdown              // worker has stopped monitoring the instance
28         StateHold                  // running, but not available to run new containers
29 )
30
31 const (
32         // TODO: configurable
33         maxPingFailTime = 10 * time.Minute
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42         StateHold:     "hold",
43 }
44
45 // String implements fmt.Stringer.
46 func (s State) String() string {
47         return stateString[s]
48 }
49
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53         return []byte(stateString[s]), nil
54 }
55
56 type worker struct {
57         logger   logrus.FieldLogger
58         executor Executor
59         wp       *Pool
60
61         mtx       sync.Locker // must be wp's Locker.
62         state     State
63         instance  cloud.Instance
64         instType  arvados.InstanceType
65         vcpus     int64
66         memory    int64
67         appeared  time.Time
68         probed    time.Time
69         updated   time.Time
70         busy      time.Time
71         destroyed time.Time
72         lastUUID  string
73         running   map[string]struct{} // remember to update state idle<->running when this changes
74         starting  map[string]struct{} // remember to update state idle<->running when this changes
75         probing   chan struct{}
76 }
77
78 // caller must have lock.
79 func (wkr *worker) startContainer(ctr arvados.Container) {
80         logger := wkr.logger.WithFields(logrus.Fields{
81                 "ContainerUUID": ctr.UUID,
82                 "Priority":      ctr.Priority,
83         })
84         logger = logger.WithField("Instance", wkr.instance)
85         logger.Debug("starting container")
86         wkr.starting[ctr.UUID] = struct{}{}
87         wkr.state = StateRunning
88         go func() {
89                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
90                 wkr.mtx.Lock()
91                 defer wkr.mtx.Unlock()
92                 now := time.Now()
93                 wkr.updated = now
94                 wkr.busy = now
95                 delete(wkr.starting, ctr.UUID)
96                 wkr.running[ctr.UUID] = struct{}{}
97                 wkr.lastUUID = ctr.UUID
98                 if err != nil {
99                         logger.WithField("stdout", string(stdout)).
100                                 WithField("stderr", string(stderr)).
101                                 WithError(err).
102                                 Error("error starting crunch-run process")
103                         // Leave uuid in wkr.running, though: it's
104                         // possible the error was just a communication
105                         // failure and the process was in fact
106                         // started.  Wait for next probe to find out.
107                         return
108                 }
109                 logger.Info("crunch-run process started")
110                 wkr.lastUUID = ctr.UUID
111         }()
112 }
113
114 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
115 // for the worker's curent state. If a previous probe is still
116 // running, it does nothing.
117 //
118 // It should be called in a new goroutine.
119 func (wkr *worker) ProbeAndUpdate() {
120         select {
121         case wkr.probing <- struct{}{}:
122                 wkr.probeAndUpdate()
123                 <-wkr.probing
124         default:
125                 wkr.logger.Debug("still waiting for last probe to finish")
126         }
127 }
128
129 // should be called in a new goroutine
130 func (wkr *worker) probeAndUpdate() {
131         wkr.mtx.Lock()
132         updated := wkr.updated
133         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
134         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
135         wkr.mtx.Unlock()
136         if !needProbeBooted && !needProbeRunning {
137                 return
138         }
139
140         var (
141                 ctrUUIDs []string
142                 ok       bool
143                 stderr   []byte
144         )
145         if needProbeBooted {
146                 ok, stderr = wkr.probeBooted()
147                 wkr.mtx.Lock()
148                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
149                         wkr.logger.Info("instance booted; will try probeRunning")
150                         needProbeRunning = true
151                 }
152                 wkr.mtx.Unlock()
153         }
154         if needProbeRunning {
155                 ctrUUIDs, ok, stderr = wkr.probeRunning()
156         }
157         logger := wkr.logger.WithField("stderr", string(stderr))
158         wkr.mtx.Lock()
159         defer wkr.mtx.Unlock()
160         if !ok {
161                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
162                         // Skip the logging noise if shutdown was
163                         // initiated during probe.
164                         return
165                 }
166                 dur := time.Since(wkr.probed)
167                 logger := logger.WithFields(logrus.Fields{
168                         "Duration": dur,
169                         "State":    wkr.state,
170                 })
171                 if wkr.state == StateBooting {
172                         logger.Debug("new instance not responding")
173                 } else {
174                         logger.Info("instance not responding")
175                 }
176                 wkr.shutdownIfBroken(dur)
177                 return
178         }
179
180         updateTime := time.Now()
181         wkr.probed = updateTime
182
183         if updated != wkr.updated {
184                 // Worker was updated after the probe began, so
185                 // wkr.running might have a container UUID that was
186                 // not yet running when ctrUUIDs was generated. Leave
187                 // wkr.running alone and wait for the next probe to
188                 // catch up on any changes.
189                 return
190         }
191
192         if len(ctrUUIDs) > 0 {
193                 wkr.busy = updateTime
194                 wkr.lastUUID = ctrUUIDs[0]
195         } else if len(wkr.running) > 0 {
196                 // Actual last-busy time was sometime between wkr.busy
197                 // and now. Now is the earliest opportunity to take
198                 // advantage of the non-busy state, though.
199                 wkr.busy = updateTime
200         }
201         running := map[string]struct{}{}
202         changed := false
203         for _, uuid := range ctrUUIDs {
204                 running[uuid] = struct{}{}
205                 if _, ok := wkr.running[uuid]; !ok {
206                         changed = true
207                 }
208         }
209         for uuid := range wkr.running {
210                 if _, ok := running[uuid]; !ok {
211                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
212                         wkr.wp.notifyExited(uuid, updateTime)
213                         changed = true
214                 }
215         }
216         if wkr.state == StateUnknown || wkr.state == StateBooting {
217                 wkr.state = StateIdle
218                 changed = true
219         }
220         if changed {
221                 wkr.running = running
222                 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
223                         wkr.state = StateRunning
224                 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
225                         wkr.state = StateIdle
226                 }
227                 wkr.updated = updateTime
228                 go wkr.wp.notify()
229         }
230 }
231
232 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
233         cmd := "crunch-run --list"
234         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
235         if err != nil {
236                 wkr.logger.WithFields(logrus.Fields{
237                         "Command": cmd,
238                         "stdout":  string(stdout),
239                         "stderr":  string(stderr),
240                 }).WithError(err).Warn("probe failed")
241                 return nil, false, stderr
242         }
243         stdout = bytes.TrimRight(stdout, "\n")
244         if len(stdout) == 0 {
245                 return nil, true, stderr
246         }
247         return strings.Split(string(stdout), "\n"), true, stderr
248 }
249
250 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
251         cmd := wkr.wp.bootProbeCommand
252         if cmd == "" {
253                 cmd = "true"
254         }
255         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
256         logger := wkr.logger.WithFields(logrus.Fields{
257                 "Command": cmd,
258                 "stdout":  string(stdout),
259                 "stderr":  string(stderr),
260         })
261         if err != nil {
262                 logger.WithError(err).Debug("boot probe failed")
263                 return false, stderr
264         }
265         logger.Info("boot probe succeeded")
266         return true, stderr
267 }
268
269 // caller must have lock.
270 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
271         if wkr.state == StateHold {
272                 return
273         }
274         label, threshold := "", wkr.wp.timeoutProbe
275         if wkr.state == StateBooting {
276                 label, threshold = "new ", wkr.wp.timeoutBooting
277         }
278         if dur < threshold {
279                 return
280         }
281         wkr.logger.WithFields(logrus.Fields{
282                 "Duration": dur,
283                 "Since":    wkr.probed,
284                 "State":    wkr.state,
285         }).Warnf("%sinstance unresponsive, shutting down", label)
286         wkr.shutdown()
287 }
288
289 // caller must have lock.
290 func (wkr *worker) shutdownIfIdle() bool {
291         if wkr.state != StateIdle {
292                 return false
293         }
294         age := time.Since(wkr.busy)
295         if age < wkr.wp.timeoutIdle {
296                 return false
297         }
298         wkr.logger.WithField("Age", age).Info("shutdown idle worker")
299         wkr.shutdown()
300         return true
301 }
302
303 // caller must have lock
304 func (wkr *worker) shutdown() {
305         now := time.Now()
306         wkr.updated = now
307         wkr.destroyed = now
308         wkr.state = StateShutdown
309         go wkr.wp.notify()
310         go func() {
311                 err := wkr.instance.Destroy()
312                 if err != nil {
313                         wkr.logger.WithError(err).Warn("shutdown failed")
314                         return
315                 }
316         }()
317 }