9945: Merge branch 'master' into 9945-make-python-package-dependency-free
[arvados.git] / lib / dispatchcloud / worker / worker.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "strings"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/lib/cloud"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/Sirupsen/logrus"
16 )
17
18 // State indicates whether a worker is available to do work, and (if
19 // not) whether/when it is expected to become ready.
20 type State int
21
22 const (
23         StateUnknown  State = iota // might be running a container already
24         StateBooting               // instance is booting
25         StateIdle                  // instance booted, no containers are running
26         StateRunning               // instance is running one or more containers
27         StateShutdown              // worker has stopped monitoring the instance
28         StateHold                  // running, but not available to run new containers
29 )
30
31 const (
32         // TODO: configurable
33         maxPingFailTime = 10 * time.Minute
34 )
35
36 var stateString = map[State]string{
37         StateUnknown:  "unknown",
38         StateBooting:  "booting",
39         StateIdle:     "idle",
40         StateRunning:  "running",
41         StateShutdown: "shutdown",
42         StateHold:     "hold",
43 }
44
45 // String implements fmt.Stringer.
46 func (s State) String() string {
47         return stateString[s]
48 }
49
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53         return []byte(stateString[s]), nil
54 }
55
56 type worker struct {
57         logger   logrus.FieldLogger
58         executor Executor
59         wp       *Pool
60
61         mtx       sync.Locker // must be wp's Locker.
62         state     State
63         instance  cloud.Instance
64         instType  arvados.InstanceType
65         vcpus     int64
66         memory    int64
67         appeared  time.Time
68         probed    time.Time
69         updated   time.Time
70         busy      time.Time
71         destroyed time.Time
72         lastUUID  string
73         running   map[string]struct{} // remember to update state idle<->running when this changes
74         starting  map[string]struct{} // remember to update state idle<->running when this changes
75         probing   chan struct{}
76 }
77
78 // caller must have lock.
79 func (wkr *worker) startContainer(ctr arvados.Container) {
80         logger := wkr.logger.WithFields(logrus.Fields{
81                 "ContainerUUID": ctr.UUID,
82                 "Priority":      ctr.Priority,
83         })
84         logger = logger.WithField("Instance", wkr.instance)
85         logger.Debug("starting container")
86         wkr.starting[ctr.UUID] = struct{}{}
87         wkr.state = StateRunning
88         go func() {
89                 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
90                 wkr.mtx.Lock()
91                 defer wkr.mtx.Unlock()
92                 now := time.Now()
93                 wkr.updated = now
94                 wkr.busy = now
95                 delete(wkr.starting, ctr.UUID)
96                 wkr.running[ctr.UUID] = struct{}{}
97                 wkr.lastUUID = ctr.UUID
98                 if err != nil {
99                         logger.WithField("stdout", string(stdout)).
100                                 WithField("stderr", string(stderr)).
101                                 WithError(err).
102                                 Error("error starting crunch-run process")
103                         // Leave uuid in wkr.running, though: it's
104                         // possible the error was just a communication
105                         // failure and the process was in fact
106                         // started.  Wait for next probe to find out.
107                         return
108                 }
109                 logger.Info("crunch-run process started")
110                 wkr.lastUUID = ctr.UUID
111         }()
112 }
113
114 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
115 // for the worker's curent state. If a previous probe is still
116 // running, it does nothing.
117 //
118 // It should be called in a new goroutine.
119 func (wkr *worker) ProbeAndUpdate() {
120         select {
121         case wkr.probing <- struct{}{}:
122                 wkr.probeAndUpdate()
123                 <-wkr.probing
124         default:
125                 wkr.logger.Debug("still waiting for last probe to finish")
126         }
127 }
128
129 // should be called in a new goroutine
130 func (wkr *worker) probeAndUpdate() {
131         wkr.mtx.Lock()
132         updated := wkr.updated
133         needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
134         needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
135         wkr.mtx.Unlock()
136         if !needProbeBooted && !needProbeRunning {
137                 return
138         }
139
140         var (
141                 ctrUUIDs []string
142                 ok       bool
143                 stderr   []byte
144         )
145         if needProbeBooted {
146                 ok, stderr = wkr.probeBooted()
147                 wkr.mtx.Lock()
148                 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
149                         wkr.logger.Info("instance booted; will try probeRunning")
150                         needProbeRunning = true
151                 }
152                 wkr.mtx.Unlock()
153         }
154         if needProbeRunning {
155                 ctrUUIDs, ok, stderr = wkr.probeRunning()
156         }
157         logger := wkr.logger.WithField("stderr", string(stderr))
158         wkr.mtx.Lock()
159         defer wkr.mtx.Unlock()
160         if !ok {
161                 if wkr.state == StateShutdown && wkr.updated.After(updated) {
162                         // Skip the logging noise if shutdown was
163                         // initiated during probe.
164                         return
165                 }
166                 dur := time.Since(wkr.probed)
167                 logger := logger.WithFields(logrus.Fields{
168                         "Duration": dur,
169                         "State":    wkr.state,
170                 })
171                 if wkr.state == StateBooting && !needProbeRunning {
172                         // If we know the instance has never passed a
173                         // boot probe, it's not noteworthy that it
174                         // hasn't passed this probe.
175                         logger.Debug("new instance not responding")
176                 } else {
177                         logger.Info("instance not responding")
178                 }
179                 wkr.shutdownIfBroken(dur)
180                 return
181         }
182
183         updateTime := time.Now()
184         wkr.probed = updateTime
185
186         if updated != wkr.updated {
187                 // Worker was updated after the probe began, so
188                 // wkr.running might have a container UUID that was
189                 // not yet running when ctrUUIDs was generated. Leave
190                 // wkr.running alone and wait for the next probe to
191                 // catch up on any changes.
192                 return
193         }
194
195         if len(ctrUUIDs) > 0 {
196                 wkr.busy = updateTime
197                 wkr.lastUUID = ctrUUIDs[0]
198         } else if len(wkr.running) > 0 {
199                 // Actual last-busy time was sometime between wkr.busy
200                 // and now. Now is the earliest opportunity to take
201                 // advantage of the non-busy state, though.
202                 wkr.busy = updateTime
203         }
204         running := map[string]struct{}{}
205         changed := false
206         for _, uuid := range ctrUUIDs {
207                 running[uuid] = struct{}{}
208                 if _, ok := wkr.running[uuid]; !ok {
209                         changed = true
210                 }
211         }
212         for uuid := range wkr.running {
213                 if _, ok := running[uuid]; !ok {
214                         logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
215                         wkr.wp.notifyExited(uuid, updateTime)
216                         changed = true
217                 }
218         }
219         if wkr.state == StateUnknown || wkr.state == StateBooting {
220                 wkr.state = StateIdle
221                 changed = true
222         }
223         if changed {
224                 wkr.running = running
225                 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
226                         wkr.state = StateRunning
227                 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
228                         wkr.state = StateIdle
229                 }
230                 wkr.updated = updateTime
231                 go wkr.wp.notify()
232         }
233 }
234
235 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
236         cmd := "crunch-run --list"
237         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
238         if err != nil {
239                 wkr.logger.WithFields(logrus.Fields{
240                         "Command": cmd,
241                         "stdout":  string(stdout),
242                         "stderr":  string(stderr),
243                 }).WithError(err).Warn("probe failed")
244                 return nil, false, stderr
245         }
246         stdout = bytes.TrimRight(stdout, "\n")
247         if len(stdout) == 0 {
248                 return nil, true, stderr
249         }
250         return strings.Split(string(stdout), "\n"), true, stderr
251 }
252
253 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
254         cmd := wkr.wp.bootProbeCommand
255         if cmd == "" {
256                 cmd = "true"
257         }
258         stdout, stderr, err := wkr.executor.Execute(cmd, nil)
259         logger := wkr.logger.WithFields(logrus.Fields{
260                 "Command": cmd,
261                 "stdout":  string(stdout),
262                 "stderr":  string(stderr),
263         })
264         if err != nil {
265                 logger.WithError(err).Debug("boot probe failed")
266                 return false, stderr
267         }
268         logger.Info("boot probe succeeded")
269         return true, stderr
270 }
271
272 // caller must have lock.
273 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
274         if wkr.state == StateHold {
275                 return
276         }
277         label, threshold := "", wkr.wp.timeoutProbe
278         if wkr.state == StateBooting {
279                 label, threshold = "new ", wkr.wp.timeoutBooting
280         }
281         if dur < threshold {
282                 return
283         }
284         wkr.logger.WithFields(logrus.Fields{
285                 "Duration": dur,
286                 "Since":    wkr.probed,
287                 "State":    wkr.state,
288         }).Warnf("%sinstance unresponsive, shutting down", label)
289         wkr.shutdown()
290 }
291
292 // caller must have lock.
293 func (wkr *worker) shutdownIfIdle() bool {
294         if wkr.state != StateIdle {
295                 return false
296         }
297         age := time.Since(wkr.busy)
298         if age < wkr.wp.timeoutIdle {
299                 return false
300         }
301         wkr.logger.WithField("Age", age).Info("shutdown idle worker")
302         wkr.shutdown()
303         return true
304 }
305
306 // caller must have lock
307 func (wkr *worker) shutdown() {
308         now := time.Now()
309         wkr.updated = now
310         wkr.destroyed = now
311         wkr.state = StateShutdown
312         go wkr.wp.notify()
313         go func() {
314                 err := wkr.instance.Destroy()
315                 if err != nil {
316                         wkr.logger.WithError(err).Warn("shutdown failed")
317                         return
318                 }
319         }()
320 }