1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/Sirupsen/logrus"
19 // State indicates whether a worker is available to do work, and (if
20 // not) whether/when it is expected to become ready.
24 StateUnknown State = iota // might be running a container already
25 StateBooting // instance is booting
26 StateIdle // instance booted, no containers are running
27 StateRunning // instance is running one or more containers
28 StateShutdown // worker has stopped monitoring the instance
29 StateHold // running, but not available to run new containers
34 maxPingFailTime = 10 * time.Minute
37 var stateString = map[State]string{
38 StateUnknown: "unknown",
39 StateBooting: "booting",
41 StateRunning: "running",
42 StateShutdown: "shutdown",
46 // String implements fmt.Stringer.
47 func (s State) String() string {
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54 return []byte(stateString[s]), nil
58 logger logrus.FieldLogger
62 mtx sync.Locker // must be wp's Locker.
64 instance cloud.Instance
65 instType arvados.InstanceType
74 running map[string]struct{} // remember to update state idle<->running when this changes
75 starting map[string]struct{} // remember to update state idle<->running when this changes
79 // caller must have lock.
80 func (wkr *worker) startContainer(ctr arvados.Container) {
81 logger := wkr.logger.WithFields(logrus.Fields{
82 "ContainerUUID": ctr.UUID,
83 "Priority": ctr.Priority,
85 logger = logger.WithField("Instance", wkr.instance)
86 logger.Debug("starting container")
87 wkr.starting[ctr.UUID] = struct{}{}
88 wkr.state = StateRunning
90 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
92 defer wkr.mtx.Unlock()
96 delete(wkr.starting, ctr.UUID)
97 wkr.running[ctr.UUID] = struct{}{}
98 wkr.lastUUID = ctr.UUID
100 logger.WithField("stdout", string(stdout)).
101 WithField("stderr", string(stderr)).
103 Error("error starting crunch-run process")
104 // Leave uuid in wkr.running, though: it's
105 // possible the error was just a communication
106 // failure and the process was in fact
107 // started. Wait for next probe to find out.
110 logger.Info("crunch-run process started")
111 wkr.lastUUID = ctr.UUID
115 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
116 // for the worker's curent state. If a previous probe is still
117 // running, it does nothing.
119 // It should be called in a new goroutine.
120 func (wkr *worker) ProbeAndUpdate() {
122 case wkr.probing <- struct{}{}:
126 wkr.logger.Debug("still waiting for last probe to finish")
130 // should be called in a new goroutine
131 func (wkr *worker) probeAndUpdate() {
133 updated := wkr.updated
134 needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
135 needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
137 if !needProbeBooted && !needProbeRunning {
147 ok, stderr = wkr.probeBooted()
149 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
150 wkr.logger.Info("instance booted; will try probeRunning")
151 needProbeRunning = true
155 if needProbeRunning {
156 ctrUUIDs, ok, stderr = wkr.probeRunning()
158 logger := wkr.logger.WithField("stderr", string(stderr))
160 defer wkr.mtx.Unlock()
162 if wkr.state == StateShutdown && wkr.updated.After(updated) {
163 // Skip the logging noise if shutdown was
164 // initiated during probe.
167 dur := time.Since(wkr.probed)
168 logger := logger.WithFields(logrus.Fields{
172 if wkr.state == StateBooting && !needProbeRunning {
173 // If we know the instance has never passed a
174 // boot probe, it's not noteworthy that it
175 // hasn't passed this probe.
176 logger.Debug("new instance not responding")
178 logger.Info("instance not responding")
180 wkr.shutdownIfBroken(dur)
184 updateTime := time.Now()
185 wkr.probed = updateTime
187 if updated != wkr.updated {
188 // Worker was updated after the probe began, so
189 // wkr.running might have a container UUID that was
190 // not yet running when ctrUUIDs was generated. Leave
191 // wkr.running alone and wait for the next probe to
192 // catch up on any changes.
196 if len(ctrUUIDs) > 0 {
197 wkr.busy = updateTime
198 wkr.lastUUID = ctrUUIDs[0]
199 } else if len(wkr.running) > 0 {
200 // Actual last-busy time was sometime between wkr.busy
201 // and now. Now is the earliest opportunity to take
202 // advantage of the non-busy state, though.
203 wkr.busy = updateTime
205 running := map[string]struct{}{}
207 for _, uuid := range ctrUUIDs {
208 running[uuid] = struct{}{}
209 if _, ok := wkr.running[uuid]; !ok {
213 for uuid := range wkr.running {
214 if _, ok := running[uuid]; !ok {
215 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
216 wkr.wp.notifyExited(uuid, updateTime)
220 if wkr.state == StateUnknown || wkr.state == StateBooting {
221 wkr.state = StateIdle
225 wkr.running = running
226 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
227 wkr.state = StateRunning
228 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
229 wkr.state = StateIdle
231 wkr.updated = updateTime
236 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
237 cmd := "crunch-run --list"
238 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
240 wkr.logger.WithFields(logrus.Fields{
242 "stdout": string(stdout),
243 "stderr": string(stderr),
244 }).WithError(err).Warn("probe failed")
245 return nil, false, stderr
247 stdout = bytes.TrimRight(stdout, "\n")
248 if len(stdout) == 0 {
249 return nil, true, stderr
251 return strings.Split(string(stdout), "\n"), true, stderr
254 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
255 cmd := wkr.wp.bootProbeCommand
259 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
260 logger := wkr.logger.WithFields(logrus.Fields{
262 "stdout": string(stdout),
263 "stderr": string(stderr),
266 logger.WithError(err).Debug("boot probe failed")
269 logger.Info("boot probe succeeded")
273 // caller must have lock.
274 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
275 if wkr.state == StateHold {
278 label, threshold := "", wkr.wp.timeoutProbe
279 if wkr.state == StateBooting {
280 label, threshold = "new ", wkr.wp.timeoutBooting
285 wkr.logger.WithFields(logrus.Fields{
289 }).Warnf("%sinstance unresponsive, shutting down", label)
293 // caller must have lock.
294 func (wkr *worker) shutdownIfIdle() bool {
295 if wkr.state != StateIdle {
298 age := time.Since(wkr.busy)
299 if age < wkr.wp.timeoutIdle {
302 wkr.logger.WithField("Age", age).Info("shutdown idle worker")
307 // caller must have lock
308 func (wkr *worker) shutdown() {
312 wkr.state = StateShutdown
315 err := wkr.instance.Destroy(context.TODO())
317 wkr.logger.WithError(err).Warn("shutdown failed")