1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/sirupsen/logrus"
21 maxPingFailTime = 10 * time.Minute
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
29 StateUnknown State = iota // might be running a container already
30 StateBooting // instance is booting
31 StateIdle // instance booted, no containers are running
32 StateRunning // instance is running one or more containers
33 StateShutdown // worker has stopped monitoring the instance
36 var stateString = map[State]string{
37 StateUnknown: "unknown",
38 StateBooting: "booting",
40 StateRunning: "running",
41 StateShutdown: "shutdown",
44 // String implements fmt.Stringer.
45 func (s State) String() string {
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52 return []byte(stateString[s]), nil
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
59 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
60 IdleBehaviorHold = "hold" // don't shutdown or run more containers
61 IdleBehaviorDrain = "drain" // shutdown immediately when idle
64 var validIdleBehavior = map[IdleBehavior]bool{
65 IdleBehaviorRun: true,
66 IdleBehaviorHold: true,
67 IdleBehaviorDrain: true,
71 logger logrus.FieldLogger
75 mtx sync.Locker // must be wp's Locker.
77 idleBehavior IdleBehavior
78 instance cloud.Instance
79 instType arvados.InstanceType
88 running map[string]struct{} // remember to update state idle<->running when this changes
89 starting map[string]struct{} // remember to update state idle<->running when this changes
93 // caller must have lock.
94 func (wkr *worker) startContainer(ctr arvados.Container) {
95 logger := wkr.logger.WithFields(logrus.Fields{
96 "ContainerUUID": ctr.UUID,
97 "Priority": ctr.Priority,
99 logger = logger.WithField("Instance", wkr.instance)
100 logger.Debug("starting container")
101 wkr.starting[ctr.UUID] = struct{}{}
102 wkr.state = StateRunning
104 env := map[string]string{
105 "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
106 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
108 stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
110 defer wkr.mtx.Unlock()
114 delete(wkr.starting, ctr.UUID)
115 wkr.running[ctr.UUID] = struct{}{}
116 wkr.lastUUID = ctr.UUID
118 logger.WithField("stdout", string(stdout)).
119 WithField("stderr", string(stderr)).
121 Error("error starting crunch-run process")
122 // Leave uuid in wkr.running, though: it's
123 // possible the error was just a communication
124 // failure and the process was in fact
125 // started. Wait for next probe to find out.
128 logger.Info("crunch-run process started")
129 wkr.lastUUID = ctr.UUID
133 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
134 // for the worker's curent state. If a previous probe is still
135 // running, it does nothing.
137 // It should be called in a new goroutine.
138 func (wkr *worker) ProbeAndUpdate() {
140 case wkr.probing <- struct{}{}:
144 wkr.logger.Debug("still waiting for last probe to finish")
148 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
149 // updates state accordingly.
151 // In StateUnknown: Call both probeBooted and probeRunning.
152 // In StateBooting: Call probeBooted; if successful, call probeRunning.
153 // In StateRunning: Call probeRunning.
154 // In StateIdle: Call probeRunning.
155 // In StateShutdown: Do nothing.
157 // If both probes succeed, wkr.state changes to
158 // StateIdle/StateRunning.
160 // If probeRunning succeeds, wkr.running is updated. (This means
161 // wkr.running might be non-empty even in StateUnknown, if the boot
164 // probeAndUpdate should be called in a new goroutine.
165 func (wkr *worker) probeAndUpdate() {
167 updated := wkr.updated
168 initialState := wkr.state
178 switch initialState {
181 case StateIdle, StateRunning:
183 case StateUnknown, StateBooting:
185 panic(fmt.Sprintf("unknown state %s", initialState))
189 booted, stderr = wkr.probeBooted()
191 // Pretend this probe succeeded if another
192 // concurrent attempt succeeded.
194 booted = wkr.state == StateRunning || wkr.state == StateIdle
197 wkr.logger.Info("instance booted; will try probeRunning")
200 if booted || wkr.state == StateUnknown {
201 ctrUUIDs, ok, stderr = wkr.probeRunning()
203 logger := wkr.logger.WithField("stderr", string(stderr))
205 defer wkr.mtx.Unlock()
206 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
207 if wkr.state == StateShutdown && wkr.updated.After(updated) {
208 // Skip the logging noise if shutdown was
209 // initiated during probe.
212 dur := time.Since(wkr.probed)
213 logger := logger.WithFields(logrus.Fields{
218 // While we're polling the VM to see if it's
219 // finished booting, failures are not
220 // noteworthy, so we log at Debug level.
221 logger.Debug("new instance not responding")
223 logger.Info("instance not responding")
225 wkr.shutdownIfBroken(dur)
229 updateTime := time.Now()
230 wkr.probed = updateTime
232 if updated != wkr.updated {
233 // Worker was updated after the probe began, so
234 // wkr.running might have a container UUID that was
235 // not yet running when ctrUUIDs was generated. Leave
236 // wkr.running alone and wait for the next probe to
237 // catch up on any changes.
241 if len(ctrUUIDs) > 0 {
242 wkr.busy = updateTime
243 wkr.lastUUID = ctrUUIDs[0]
244 } else if len(wkr.running) > 0 {
245 // Actual last-busy time was sometime between wkr.busy
246 // and now. Now is the earliest opportunity to take
247 // advantage of the non-busy state, though.
248 wkr.busy = updateTime
250 running := map[string]struct{}{}
252 for _, uuid := range ctrUUIDs {
253 running[uuid] = struct{}{}
254 if _, ok := wkr.running[uuid]; !ok {
258 for uuid := range wkr.running {
259 if _, ok := running[uuid]; !ok {
260 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
261 wkr.wp.notifyExited(uuid, updateTime)
265 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
266 // Note: this will change again below if
267 // len(wkr.starting)+len(wkr.running) > 0.
268 wkr.state = StateIdle
270 } else if wkr.state == StateUnknown && len(running) != len(wkr.running) {
271 logger.WithFields(logrus.Fields{
272 "RunningContainers": len(running),
274 }).Info("crunch-run probe succeeded, but boot probe is still failing")
280 wkr.running = running
281 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
282 wkr.state = StateRunning
283 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
284 wkr.state = StateIdle
286 wkr.updated = updateTime
287 if booted && (initialState == StateUnknown || initialState == StateBooting) {
288 logger.WithFields(logrus.Fields{
289 "RunningContainers": len(running),
291 }).Info("probes succeeded, instance is in service")
296 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
297 cmd := "crunch-run --list"
298 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
300 wkr.logger.WithFields(logrus.Fields{
302 "stdout": string(stdout),
303 "stderr": string(stderr),
304 }).WithError(err).Warn("probe failed")
305 return nil, false, stderr
307 stdout = bytes.TrimRight(stdout, "\n")
308 if len(stdout) == 0 {
309 return nil, true, stderr
311 return strings.Split(string(stdout), "\n"), true, stderr
314 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
315 cmd := wkr.wp.bootProbeCommand
319 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
320 logger := wkr.logger.WithFields(logrus.Fields{
322 "stdout": string(stdout),
323 "stderr": string(stderr),
326 logger.WithError(err).Debug("boot probe failed")
329 logger.Info("boot probe succeeded")
333 // caller must have lock.
334 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
335 if wkr.idleBehavior == IdleBehaviorHold {
338 label, threshold := "", wkr.wp.timeoutProbe
339 if wkr.state == StateUnknown || wkr.state == StateBooting {
340 label, threshold = "new ", wkr.wp.timeoutBooting
345 wkr.logger.WithFields(logrus.Fields{
349 }).Warnf("%sinstance unresponsive, shutting down", label)
353 // caller must have lock.
354 func (wkr *worker) shutdownIfIdle() bool {
355 if wkr.idleBehavior == IdleBehaviorHold {
358 if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
361 age := time.Since(wkr.busy)
362 if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
365 wkr.logger.WithFields(logrus.Fields{
367 "IdleBehavior": wkr.idleBehavior,
368 }).Info("shutdown idle worker")
373 // caller must have lock.
374 func (wkr *worker) shutdown() {
378 wkr.state = StateShutdown
381 err := wkr.instance.Destroy()
383 wkr.logger.WithError(err).Warn("shutdown failed")
389 // Save worker tags to cloud provider metadata, if they don't already
390 // match. Caller must have lock.
391 func (wkr *worker) saveTags() {
392 instance := wkr.instance
393 have := instance.Tags()
394 want := cloud.InstanceTags{
395 tagKeyInstanceType: wkr.instType.Name,
396 tagKeyIdleBehavior: string(wkr.idleBehavior),
399 for k, v := range want {
403 err := instance.SetTags(want)
405 wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")