14360: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "strings"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/lib/cloud"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/Sirupsen/logrus"
16 )
17
18 // State indicates whether a worker is available to do work, and (if
19 // not) whether/when it is expected to become ready.
20 type State int
21
22 const (
23         StateUnknown  State = iota // might be running a container already
24         StateBooting               // instance is booting
25         StateIdle                  // instance booted, no containers are running
26         StateRunning               // instance is running one or more containers
27         StateShutdown              // worker has stopped monitoring the instance
28         StateHold                  // running, but not available to run new containers
29 )
30
31 const (
32         // TODO: configurable
33         maxPingFailTime = 10 * time.Minute
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42         StateHold:     "hold",
43 }
44
45 // String implements fmt.Stringer.
46 func (s State) String() string {
47         return stateString[s]
48 }
49
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53         return []byte(stateString[s]), nil
54 }
55
56 type worker struct {
57         logger   logrus.FieldLogger
58         executor Executor
59         wp       *Pool
60
61         mtx       sync.Locker // must be wp's Locker.
62         state     State
63         instance  cloud.Instance
64         instType  arvados.InstanceType
65         vcpus     int64
66         memory    int64
67         probed    time.Time
68         updated   time.Time
69         busy      time.Time
70         destroyed time.Time
71         lastUUID  string
72         running   map[string]struct{} // remember to update state idle<->running when this changes
73         starting  map[string]struct{} // remember to update state idle<->running when this changes
74         probing   chan struct{}
75 }
76
77 // caller must have lock.
78 func (wkr *worker) startContainer(ctr arvados.Container) {
79         logger := wkr.logger.WithFields(logrus.Fields{
80                 "ContainerUUID": ctr.UUID,
81                 "Priority":      ctr.Priority,
82         })
83         logger = logger.WithField("Instance", wkr.instance)
84         logger.Debug("starting container")
85         wkr.starting[ctr.UUID] = struct{}{}
86         wkr.state = StateRunning
87         go func() {
88                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
89                 wkr.mtx.Lock()
90                 defer wkr.mtx.Unlock()
91                 now := time.Now()
92                 wkr.updated = now
93                 wkr.busy = now
94                 delete(wkr.starting, ctr.UUID)
95                 wkr.running[ctr.UUID] = struct{}{}
96                 wkr.lastUUID = ctr.UUID
97                 if err != nil {
98                         logger.WithField("stdout", string(stdout)).
99                                 WithField("stderr", string(stderr)).
100                                 WithError(err).
101                                 Error("error starting crunch-run process")
102                         // Leave uuid in wkr.running, though: it's
103                         // possible the error was just a communication
104                         // failure and the process was in fact
105                         // started.  Wait for next probe to find out.
106                         return
107                 }
108                 logger.Info("crunch-run process started")
109                 wkr.lastUUID = ctr.UUID
110         }()
111 }
112
113 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
114 // for the worker's curent state. If a previous probe is still
115 // running, it does nothing.
116 //
117 // It should be called in a new goroutine.
118 func (wkr *worker) ProbeAndUpdate() {
119         select {
120         case wkr.probing <- struct{}{}:
121                 wkr.probeAndUpdate()
122                 <-wkr.probing
123         default:
124                 wkr.logger.Debug("still waiting for last probe to finish")
125         }
126 }
127
128 // should be called in a new goroutine
129 func (wkr *worker) probeAndUpdate() {
130         wkr.mtx.Lock()
131         updated := wkr.updated
132         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
133         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
134         wkr.mtx.Unlock()
135         if !needProbeBooted && !needProbeRunning {
136                 return
137         }
138
139         var (
140                 ctrUUIDs []string
141                 ok       bool
142                 stderr   []byte
143         )
144         if needProbeBooted {
145                 ok, stderr = wkr.probeBooted()
146                 wkr.mtx.Lock()
147                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
148                         wkr.logger.Info("instance booted; will try probeRunning")
149                         needProbeRunning = true
150                 }
151                 wkr.mtx.Unlock()
152         }
153         if needProbeRunning {
154                 ctrUUIDs, ok, stderr = wkr.probeRunning()
155         }
156         logger := wkr.logger.WithField("stderr", string(stderr))
157         wkr.mtx.Lock()
158         defer wkr.mtx.Unlock()
159         if !ok {
160                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
161                         // Skip the logging noise if shutdown was
162                         // initiated during probe.
163                         return
164                 }
165                 dur := time.Since(wkr.probed)
166                 logger := logger.WithFields(logrus.Fields{
167                         "Duration": dur,
168                         "State":    wkr.state,
169                 })
170                 if wkr.state == StateBooting {
171                         logger.Debug("new instance not responding")
172                 } else {
173                         logger.Info("instance not responding")
174                 }
175                 wkr.shutdownIfBroken(dur)
176                 return
177         }
178
179         updateTime := time.Now()
180         wkr.probed = updateTime
181
182         if updated != wkr.updated {
183                 // Worker was updated after the probe began, so
184                 // wkr.running might have a container UUID that was
185                 // not yet running when ctrUUIDs was generated. Leave
186                 // wkr.running alone and wait for the next probe to
187                 // catch up on any changes.
188                 return
189         }
190
191         if len(ctrUUIDs) > 0 {
192                 wkr.busy = updateTime
193                 wkr.lastUUID = ctrUUIDs[0]
194         } else if len(wkr.running) > 0 {
195                 // Actual last-busy time was sometime between wkr.busy
196                 // and now. Now is the earliest opportunity to take
197                 // advantage of the non-busy state, though.
198                 wkr.busy = updateTime
199         }
200         running := map[string]struct{}{}
201         changed := false
202         for _, uuid := range ctrUUIDs {
203                 running[uuid] = struct{}{}
204                 if _, ok := wkr.running[uuid]; !ok {
205                         changed = true
206                 }
207         }
208         for uuid := range wkr.running {
209                 if _, ok := running[uuid]; !ok {
210                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
211                         wkr.wp.notifyExited(uuid, updateTime)
212                         changed = true
213                 }
214         }
215         if wkr.state == StateUnknown || wkr.state == StateBooting {
216                 wkr.state = StateIdle
217                 changed = true
218         }
219         if changed {
220                 wkr.running = running
221                 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
222                         wkr.state = StateRunning
223                 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
224                         wkr.state = StateIdle
225                 }
226                 wkr.updated = updateTime
227                 go wkr.wp.notify()
228         }
229 }
230
231 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
232         cmd := "crunch-run --list"
233         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
234         if err != nil {
235                 wkr.logger.WithFields(logrus.Fields{
236                         "Command": cmd,
237                         "stdout":  string(stdout),
238                         "stderr":  string(stderr),
239                 }).WithError(err).Warn("probe failed")
240                 return nil, false, stderr
241         }
242         stdout = bytes.TrimRight(stdout, "\n")
243         if len(stdout) == 0 {
244                 return nil, true, stderr
245         }
246         return strings.Split(string(stdout), "\n"), true, stderr
247 }
248
249 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
250         cmd := wkr.wp.bootProbeCommand
251         if cmd == "" {
252                 cmd = "true"
253         }
254         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
255         logger := wkr.logger.WithFields(logrus.Fields{
256                 "Command": cmd,
257                 "stdout":  string(stdout),
258                 "stderr":  string(stderr),
259         })
260         if err != nil {
261                 logger.WithError(err).Debug("boot probe failed")
262                 return false, stderr
263         }
264         logger.Info("boot probe succeeded")
265         return true, stderr
266 }
267
268 // caller must have lock.
269 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
270         if wkr.state == StateHold {
271                 return
272         }
273         label, threshold := "", wkr.wp.timeoutProbe
274         if wkr.state == StateBooting {
275                 label, threshold = "new ", wkr.wp.timeoutBooting
276         }
277         if dur < threshold {
278                 return
279         }
280         wkr.logger.WithFields(logrus.Fields{
281                 "Duration": dur,
282                 "Since":    wkr.probed,
283                 "State":    wkr.state,
284         }).Warnf("%sinstance unresponsive, shutting down", label)
285         wkr.shutdown()
286 }
287
288 // caller must have lock.
289 func (wkr *worker) shutdownIfIdle() bool {
290         if wkr.state != StateIdle {
291                 return false
292         }
293         age := time.Since(wkr.busy)
294         if age < wkr.wp.timeoutIdle {
295                 return false
296         }
297         wkr.logger.WithField("Age", age).Info("shutdown idle worker")
298         wkr.shutdown()
299         return true
300 }
301
302 // caller must have lock
303 func (wkr *worker) shutdown() {
304         now := time.Now()
305         wkr.updated = now
306         wkr.destroyed = now
307         wkr.state = StateShutdown
308         go func() {
309                 err := wkr.instance.Destroy()
310                 if err != nil {
311                         wkr.logger.WithError(err).Warn("shutdown failed")
312                         return
313                 }
314         }()
315 }