1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.curoverse.com/arvados.git/lib/cloud"
14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "github.com/sirupsen/logrus"
18 // State indicates whether a worker is available to do work, and (if
19 // not) whether/when it is expected to become ready.
23 StateUnknown State = iota // might be running a container already
24 StateBooting // instance is booting
25 StateIdle // instance booted, no containers are running
26 StateRunning // instance is running one or more containers
27 StateShutdown // worker has stopped monitoring the instance
28 StateHold // running, but not available to run new containers
33 maxPingFailTime = 10 * time.Minute
36 var stateString = map[State]string{
37 StateUnknown: "unknown",
38 StateBooting: "booting",
40 StateRunning: "running",
41 StateShutdown: "shutdown",
45 // String implements fmt.Stringer.
46 func (s State) String() string {
50 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
51 // map[State]anything uses the state's string representation.
52 func (s State) MarshalText() ([]byte, error) {
53 return []byte(stateString[s]), nil
57 logger logrus.FieldLogger
61 mtx sync.Locker // must be wp's Locker.
63 instance cloud.Instance
64 instType arvados.InstanceType
73 running map[string]struct{} // remember to update state idle<->running when this changes
74 starting map[string]struct{} // remember to update state idle<->running when this changes
78 // caller must have lock.
79 func (wkr *worker) startContainer(ctr arvados.Container) {
80 logger := wkr.logger.WithFields(logrus.Fields{
81 "ContainerUUID": ctr.UUID,
82 "Priority": ctr.Priority,
84 logger = logger.WithField("Instance", wkr.instance)
85 logger.Debug("starting container")
86 wkr.starting[ctr.UUID] = struct{}{}
87 wkr.state = StateRunning
89 stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
91 defer wkr.mtx.Unlock()
95 delete(wkr.starting, ctr.UUID)
96 wkr.running[ctr.UUID] = struct{}{}
97 wkr.lastUUID = ctr.UUID
99 logger.WithField("stdout", string(stdout)).
100 WithField("stderr", string(stderr)).
102 Error("error starting crunch-run process")
103 // Leave uuid in wkr.running, though: it's
104 // possible the error was just a communication
105 // failure and the process was in fact
106 // started. Wait for next probe to find out.
109 logger.Info("crunch-run process started")
110 wkr.lastUUID = ctr.UUID
114 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
115 // for the worker's curent state. If a previous probe is still
116 // running, it does nothing.
118 // It should be called in a new goroutine.
119 func (wkr *worker) ProbeAndUpdate() {
121 case wkr.probing <- struct{}{}:
125 wkr.logger.Debug("still waiting for last probe to finish")
129 // should be called in a new goroutine
130 func (wkr *worker) probeAndUpdate() {
132 updated := wkr.updated
133 needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
134 needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
136 if !needProbeBooted && !needProbeRunning {
146 ok, stderr = wkr.probeBooted()
148 if ok || wkr.state == StateRunning || wkr.state == StateIdle {
149 wkr.logger.Info("instance booted; will try probeRunning")
150 needProbeRunning = true
154 if needProbeRunning {
155 ctrUUIDs, ok, stderr = wkr.probeRunning()
157 logger := wkr.logger.WithField("stderr", string(stderr))
159 defer wkr.mtx.Unlock()
161 if wkr.state == StateShutdown && wkr.updated.After(updated) {
162 // Skip the logging noise if shutdown was
163 // initiated during probe.
166 dur := time.Since(wkr.probed)
167 logger := logger.WithFields(logrus.Fields{
171 if wkr.state == StateBooting && !needProbeRunning {
172 // If we know the instance has never passed a
173 // boot probe, it's not noteworthy that it
174 // hasn't passed this probe.
175 logger.Debug("new instance not responding")
177 logger.Info("instance not responding")
179 wkr.shutdownIfBroken(dur)
183 updateTime := time.Now()
184 wkr.probed = updateTime
186 if updated != wkr.updated {
187 // Worker was updated after the probe began, so
188 // wkr.running might have a container UUID that was
189 // not yet running when ctrUUIDs was generated. Leave
190 // wkr.running alone and wait for the next probe to
191 // catch up on any changes.
195 if len(ctrUUIDs) > 0 {
196 wkr.busy = updateTime
197 wkr.lastUUID = ctrUUIDs[0]
198 } else if len(wkr.running) > 0 {
199 // Actual last-busy time was sometime between wkr.busy
200 // and now. Now is the earliest opportunity to take
201 // advantage of the non-busy state, though.
202 wkr.busy = updateTime
204 running := map[string]struct{}{}
206 for _, uuid := range ctrUUIDs {
207 running[uuid] = struct{}{}
208 if _, ok := wkr.running[uuid]; !ok {
212 for uuid := range wkr.running {
213 if _, ok := running[uuid]; !ok {
214 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
215 wkr.wp.notifyExited(uuid, updateTime)
219 if wkr.state == StateUnknown || wkr.state == StateBooting {
220 wkr.state = StateIdle
224 wkr.running = running
225 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
226 wkr.state = StateRunning
227 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
228 wkr.state = StateIdle
230 wkr.updated = updateTime
235 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
236 cmd := "crunch-run --list"
237 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
239 wkr.logger.WithFields(logrus.Fields{
241 "stdout": string(stdout),
242 "stderr": string(stderr),
243 }).WithError(err).Warn("probe failed")
244 return nil, false, stderr
246 stdout = bytes.TrimRight(stdout, "\n")
247 if len(stdout) == 0 {
248 return nil, true, stderr
250 return strings.Split(string(stdout), "\n"), true, stderr
253 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
254 cmd := wkr.wp.bootProbeCommand
258 stdout, stderr, err := wkr.executor.Execute(cmd, nil)
259 logger := wkr.logger.WithFields(logrus.Fields{
261 "stdout": string(stdout),
262 "stderr": string(stderr),
265 logger.WithError(err).Debug("boot probe failed")
268 logger.Info("boot probe succeeded")
272 // caller must have lock.
273 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
274 if wkr.state == StateHold {
277 label, threshold := "", wkr.wp.timeoutProbe
278 if wkr.state == StateBooting {
279 label, threshold = "new ", wkr.wp.timeoutBooting
284 wkr.logger.WithFields(logrus.Fields{
288 }).Warnf("%sinstance unresponsive, shutting down", label)
292 // caller must have lock.
293 func (wkr *worker) shutdownIfIdle() bool {
294 if wkr.state != StateIdle {
297 age := time.Since(wkr.busy)
298 if age < wkr.wp.timeoutIdle {
301 wkr.logger.WithField("Age", age).Info("shutdown idle worker")
306 // caller must have lock
307 func (wkr *worker) shutdown() {
311 wkr.state = StateShutdown
314 err := wkr.instance.Destroy()
316 wkr.logger.WithError(err).Warn("shutdown failed")