1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.curoverse.com/arvados.git/lib/cloud"
14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "github.com/Sirupsen/logrus"
18 // State indicates whether a worker is available to do work, and (if
19 // not) whether/when it is expected to become ready.
23 StateUnknown State = iota // might be running a container already
24 StateBooting // instance is booting
25 StateIdle // instance booted, no containers are running
26 StateRunning // instance is running one or more containers
27 StateShutdown // worker has stopped monitoring the instance
28 StateHold // running, but not available to run new containers
33 maxPingFailTime = 10 * time.Minute
36 var stateString = map[State]string{
37 StateUnknown: "unknown",
38 StateBooting: "booting",
40 StateRunning: "running",
41 StateShutdown: "shutdown",
45 // String implements fmt.Stringer.
46 func (s State) String() string {
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53 return []byte(stateString[s]), nil
57 logger logrus.FieldLogger
61 mtx sync.Locker // must be wp's Locker.
63 instance cloud.Instance
64 instType arvados.InstanceType
72 running map[string]struct{} // remember to update state idle<->running when this changes
73 starting map[string]struct{} // remember to update state idle<->running when this changes
77 // caller must have lock.
78 func (wkr *worker) startContainer(ctr arvados.Container) {
79 logger := wkr.logger.WithFields(logrus.Fields{
80 "ContainerUUID": ctr.UUID,
81 "Priority": ctr.Priority,
83 logger = logger.WithField("Instance", wkr.instance)
84 logger.Debug("starting container")
85 wkr.starting[ctr.UUID] = struct{}{}
86 wkr.state = StateRunning
88 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
90 defer wkr.mtx.Unlock()
94 delete(wkr.starting, ctr.UUID)
95 wkr.running[ctr.UUID] = struct{}{}
96 wkr.lastUUID = ctr.UUID
98 logger.WithField("stdout", string(stdout)).
99 WithField("stderr", string(stderr)).
101 Error("error starting crunch-run process")
102 // Leave uuid in wkr.running, though: it's
103 // possible the error was just a communication
104 // failure and the process was in fact
105 // started. Wait for next probe to find out.
108 logger.Info("crunch-run process started")
109 wkr.lastUUID = ctr.UUID
113 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
114 // for the worker's curent state. If a previous probe is still
115 // running, it does nothing.
117 // It should be called in a new goroutine.
118 func (wkr *worker) ProbeAndUpdate() {
120 case wkr.probing <- struct{}{}:
124 wkr.logger.Debug("still waiting for last probe to finish")
128 // should be called in a new goroutine
129 func (wkr *worker) probeAndUpdate() {
131 updated := wkr.updated
132 needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
133 needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
135 if !needProbeBooted && !needProbeRunning {
145 ok, stderr = wkr.probeBooted()
147 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
148 wkr.logger.Info("instance booted; will try probeRunning")
149 needProbeRunning = true
153 if needProbeRunning {
154 ctrUUIDs, ok, stderr = wkr.probeRunning()
156 logger := wkr.logger.WithField("stderr", string(stderr))
158 defer wkr.mtx.Unlock()
160 if wkr.state == StateShutdown && wkr.updated.After(updated) {
161 // Skip the logging noise if shutdown was
162 // initiated during probe.
165 dur := time.Since(wkr.probed)
166 logger := logger.WithFields(logrus.Fields{
170 if wkr.state == StateBooting {
171 logger.Debug("new instance not responding")
173 logger.Info("instance not responding")
175 wkr.shutdownIfBroken(dur)
179 updateTime := time.Now()
180 wkr.probed = updateTime
182 if updated != wkr.updated {
183 // Worker was updated after the probe began, so
184 // wkr.running might have a container UUID that was
185 // not yet running when ctrUUIDs was generated. Leave
186 // wkr.running alone and wait for the next probe to
187 // catch up on any changes.
191 if len(ctrUUIDs) > 0 {
192 wkr.busy = updateTime
193 wkr.lastUUID = ctrUUIDs[0]
194 } else if len(wkr.running) > 0 {
195 // Actual last-busy time was sometime between wkr.busy
196 // and now. Now is the earliest opportunity to take
197 // advantage of the non-busy state, though.
198 wkr.busy = updateTime
200 running := map[string]struct{}{}
202 for _, uuid := range ctrUUIDs {
203 running[uuid] = struct{}{}
204 if _, ok := wkr.running[uuid]; !ok {
208 for uuid := range wkr.running {
209 if _, ok := running[uuid]; !ok {
210 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
211 wkr.wp.notifyExited(uuid, updateTime)
215 if wkr.state == StateUnknown || wkr.state == StateBooting {
216 wkr.state = StateIdle
220 wkr.running = running
221 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
222 wkr.state = StateRunning
223 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
224 wkr.state = StateIdle
226 wkr.updated = updateTime
231 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
232 cmd := "crunch-run --list"
233 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
235 wkr.logger.WithFields(logrus.Fields{
237 "stdout": string(stdout),
238 "stderr": string(stderr),
239 }).WithError(err).Warn("probe failed")
240 return nil, false, stderr
242 stdout = bytes.TrimRight(stdout, "\n")
243 if len(stdout) == 0 {
244 return nil, true, stderr
246 return strings.Split(string(stdout), "\n"), true, stderr
249 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
250 cmd := wkr.wp.bootProbeCommand
254 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
255 logger := wkr.logger.WithFields(logrus.Fields{
257 "stdout": string(stdout),
258 "stderr": string(stderr),
261 logger.WithError(err).Debug("boot probe failed")
264 logger.Info("boot probe succeeded")
268 // caller must have lock.
269 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
270 if wkr.state == StateHold {
273 label, threshold := "", wkr.wp.timeoutProbe
274 if wkr.state == StateBooting {
275 label, threshold = "new ", wkr.wp.timeoutBooting
280 wkr.logger.WithFields(logrus.Fields{
284 }).Warnf("%sinstance unresponsive, shutting down", label)
288 // caller must have lock.
289 func (wkr *worker) shutdownIfIdle() bool {
290 if wkr.state != StateIdle {
293 age := time.Since(wkr.busy)
294 if age < wkr.wp.timeoutIdle {
297 wkr.logger.WithField("Age", age).Info("shutdown idle worker")
302 // caller must have lock
303 func (wkr *worker) shutdown() {
307 wkr.state = StateShutdown
309 err := wkr.instance.Destroy()
311 wkr.logger.WithError(err).Warn("shutdown failed")