1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "git.curoverse.com/arvados.git/lib/cloud"
15 "git.curoverse.com/arvados.git/sdk/go/arvados"
16 "github.com/sirupsen/logrus"
21 maxPingFailTime = 10 * time.Minute
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
29 StateUnknown State = iota // might be running a container already
30 StateBooting // instance is booting
31 StateIdle // instance booted, no containers are running
32 StateRunning // instance is running one or more containers
33 StateShutdown // worker has stopped monitoring the instance
36 var stateString = map[State]string{
37 StateUnknown: "unknown",
38 StateBooting: "booting",
40 StateRunning: "running",
41 StateShutdown: "shutdown",
44 // String implements fmt.Stringer.
45 func (s State) String() string {
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52 return []byte(stateString[s]), nil
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
59 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
60 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
61 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
64 var validIdleBehavior = map[IdleBehavior]bool{
65 IdleBehaviorRun: true,
66 IdleBehaviorHold: true,
67 IdleBehaviorDrain: true,
71 logger logrus.FieldLogger
75 mtx sync.Locker // must be wp's Locker.
77 idleBehavior IdleBehavior
78 instance cloud.Instance
79 instType arvados.InstanceType
88 running map[string]struct{} // remember to update state idle<->running when this changes
89 starting map[string]struct{} // remember to update state idle<->running when this changes
93 // caller must have lock.
94 func (wkr *worker) startContainer(ctr arvados.Container) {
95 logger := wkr.logger.WithFields(logrus.Fields{
96 "ContainerUUID": ctr.UUID,
97 "Priority": ctr.Priority,
99 logger = logger.WithField("Instance", wkr.instance.ID())
100 logger.Debug("starting container")
101 wkr.starting[ctr.UUID] = struct{}{}
102 wkr.state = StateRunning
104 cmd := "crunch-run --detach '" + ctr.UUID + "'"
105 stdin := bytes.NewBufferString(fmt.Sprintf("export %s=%q\nexport %s=%q\n",
106 "ARVADOS_API_HOST", wkr.wp.arvClient.APIHost,
107 "ARVADOS_API_TOKEN", wkr.wp.arvClient.AuthToken))
108 if u := wkr.instance.RemoteUser(); u != "root" {
109 cmd = "sudo -E " + cmd
111 cmd = "source /dev/stdin; " + cmd
112 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
114 defer wkr.mtx.Unlock()
118 delete(wkr.starting, ctr.UUID)
119 wkr.running[ctr.UUID] = struct{}{}
120 wkr.lastUUID = ctr.UUID
122 logger.WithField("stdout", string(stdout)).
123 WithField("stderr", string(stderr)).
125 Error("error starting crunch-run process")
126 // Leave uuid in wkr.running, though: it's
127 // possible the error was just a communication
128 // failure and the process was in fact
129 // started. Wait for next probe to find out.
132 logger.Info("crunch-run process started")
133 wkr.lastUUID = ctr.UUID
137 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
138 // for the worker's curent state. If a previous probe is still
139 // running, it does nothing.
141 // It should be called in a new goroutine.
142 func (wkr *worker) ProbeAndUpdate() {
144 case wkr.probing <- struct{}{}:
148 wkr.logger.Debug("still waiting for last probe to finish")
152 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
153 // updates state accordingly.
155 // In StateUnknown: Call both probeBooted and probeRunning.
156 // In StateBooting: Call probeBooted; if successful, call probeRunning.
157 // In StateRunning: Call probeRunning.
158 // In StateIdle: Call probeRunning.
159 // In StateShutdown: Do nothing.
161 // If both probes succeed, wkr.state changes to
162 // StateIdle/StateRunning.
164 // If probeRunning succeeds, wkr.running is updated. (This means
165 // wkr.running might be non-empty even in StateUnknown, if the boot
168 // probeAndUpdate should be called in a new goroutine.
169 func (wkr *worker) probeAndUpdate() {
171 updated := wkr.updated
172 initialState := wkr.state
179 stderr []byte // from probeBooted
182 switch initialState {
185 case StateIdle, StateRunning:
187 case StateUnknown, StateBooting:
189 panic(fmt.Sprintf("unknown state %s", initialState))
192 probeStart := time.Now()
193 logger := wkr.logger.WithField("ProbeStart", probeStart)
196 booted, stderr = wkr.probeBooted()
198 // Pretend this probe succeeded if another
199 // concurrent attempt succeeded.
201 booted = wkr.state == StateRunning || wkr.state == StateIdle
205 logger.Info("instance booted; will try probeRunning")
208 if booted || wkr.state == StateUnknown {
209 ctrUUIDs, ok = wkr.probeRunning()
212 defer wkr.mtx.Unlock()
213 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
214 if wkr.state == StateShutdown && wkr.updated.After(updated) {
215 // Skip the logging noise if shutdown was
216 // initiated during probe.
219 // Using the start time of the probe as the timeout
220 // threshold ensures we always initiate at least one
221 // probe attempt after the boot/probe timeout expires
222 // (otherwise, a slow probe failure could cause us to
223 // shutdown an instance even though it did in fact
224 // boot/recover before the timeout expired).
225 dur := probeStart.Sub(wkr.probed)
226 if wkr.shutdownIfBroken(dur) {
227 // stderr from failed run-probes will have
228 // been logged already, but boot-probe
229 // failures are normal so they are logged only
230 // at Debug level. This is our chance to log
231 // some evidence about why the node never
232 // booted, even in non-debug mode.
234 logger.WithFields(logrus.Fields{
236 "stderr": string(stderr),
237 }).Info("boot failed")
243 updateTime := time.Now()
244 wkr.probed = updateTime
246 if updated != wkr.updated {
247 // Worker was updated after the probe began, so
248 // wkr.running might have a container UUID that was
249 // not yet running when ctrUUIDs was generated. Leave
250 // wkr.running alone and wait for the next probe to
251 // catch up on any changes.
255 if len(ctrUUIDs) > 0 {
256 wkr.busy = updateTime
257 wkr.lastUUID = ctrUUIDs[0]
258 } else if len(wkr.running) > 0 {
259 // Actual last-busy time was sometime between wkr.busy
260 // and now. Now is the earliest opportunity to take
261 // advantage of the non-busy state, though.
262 wkr.busy = updateTime
266 // Build a new "running" map. Set changed=true if it differs
267 // from the existing map (wkr.running) to ensure the scheduler
268 // gets notified below.
269 running := map[string]struct{}{}
270 for _, uuid := range ctrUUIDs {
271 running[uuid] = struct{}{}
272 if _, ok := wkr.running[uuid]; !ok {
273 if _, ok := wkr.starting[uuid]; !ok {
274 // We didn't start it -- it must have
275 // been started by a previous
276 // dispatcher process.
277 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
282 for uuid := range wkr.running {
283 if _, ok := running[uuid]; !ok {
284 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
285 wkr.wp.notifyExited(uuid, updateTime)
290 // Update state if this was the first successful boot-probe.
291 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
292 // Note: this will change again below if
293 // len(wkr.starting)+len(wkr.running) > 0.
294 wkr.state = StateIdle
298 // If wkr.state and wkr.running aren't changing then there's
299 // no need to log anything, notify the scheduler, move state
300 // back and forth between idle/running, etc.
305 // Log whenever a run-probe reveals crunch-run processes
306 // appearing/disappearing before boot-probe succeeds.
307 if wkr.state == StateUnknown && len(running) != len(wkr.running) {
308 logger.WithFields(logrus.Fields{
309 "RunningContainers": len(running),
311 }).Info("crunch-run probe succeeded, but boot probe is still failing")
314 wkr.running = running
315 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
316 wkr.state = StateRunning
317 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
318 wkr.state = StateIdle
320 wkr.updated = updateTime
321 if booted && (initialState == StateUnknown || initialState == StateBooting) {
322 logger.WithFields(logrus.Fields{
323 "RunningContainers": len(running),
325 }).Info("probes succeeded, instance is in service")
330 func (wkr *worker) probeRunning() (running []string, ok bool) {
331 cmd := "crunch-run --list"
332 if u := wkr.instance.RemoteUser(); u != "root" {
335 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
337 wkr.logger.WithFields(logrus.Fields{
339 "stdout": string(stdout),
340 "stderr": string(stderr),
341 }).WithError(err).Warn("probe failed")
344 stdout = bytes.TrimRight(stdout, "\n")
345 if len(stdout) == 0 {
348 return strings.Split(string(stdout), "\n"), true
351 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
352 cmd := wkr.wp.bootProbeCommand
356 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
357 logger := wkr.logger.WithFields(logrus.Fields{
359 "stdout": string(stdout),
360 "stderr": string(stderr),
363 logger.WithError(err).Debug("boot probe failed")
366 logger.Info("boot probe succeeded")
370 // caller must have lock.
371 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
372 if wkr.idleBehavior == IdleBehaviorHold {
376 label, threshold := "", wkr.wp.timeoutProbe
377 if wkr.state == StateUnknown || wkr.state == StateBooting {
378 label, threshold = "new ", wkr.wp.timeoutBooting
383 wkr.logger.WithFields(logrus.Fields{
387 }).Warnf("%sinstance unresponsive, shutting down", label)
392 // caller must have lock.
393 func (wkr *worker) shutdownIfIdle() bool {
394 if wkr.idleBehavior == IdleBehaviorHold {
398 age := time.Since(wkr.busy)
400 old := age >= wkr.wp.timeoutIdle
401 draining := wkr.idleBehavior == IdleBehaviorDrain
402 shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
403 (draining && wkr.state == StateBooting)
408 wkr.logger.WithFields(logrus.Fields{
411 "IdleBehavior": wkr.idleBehavior,
412 }).Info("shutdown idle worker")
417 // caller must have lock.
418 func (wkr *worker) shutdown() {
422 wkr.state = StateShutdown
425 err := wkr.instance.Destroy()
427 wkr.logger.WithError(err).Warn("shutdown failed")
433 // Save worker tags to cloud provider metadata, if they don't already
434 // match. Caller must have lock.
435 func (wkr *worker) saveTags() {
436 instance := wkr.instance
437 have := instance.Tags()
438 want := cloud.InstanceTags{
439 tagKeyInstanceType: wkr.instType.Name,
440 tagKeyIdleBehavior: string(wkr.idleBehavior),
443 for k, v := range want {
447 err := instance.SetTags(want)
449 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")