1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/lib/cloud"
16 "git.curoverse.com/arvados.git/sdk/go/arvados"
17 "git.curoverse.com/arvados.git/sdk/go/stats"
18 "github.com/sirupsen/logrus"
23 maxPingFailTime = 10 * time.Minute
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
31 StateUnknown State = iota // might be running a container already
32 StateBooting // instance is booting
33 StateIdle // instance booted, no containers are running
34 StateRunning // instance is running one or more containers
35 StateShutdown // worker has stopped monitoring the instance
38 var stateString = map[State]string{
39 StateUnknown: "unknown",
40 StateBooting: "booting",
42 StateRunning: "running",
43 StateShutdown: "shutdown",
46 // String implements fmt.Stringer.
47 func (s State) String() string {
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54 return []byte(stateString[s]), nil
57 // IdleBehavior indicates the behavior desired when a node becomes idle.
58 type IdleBehavior string
61 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
62 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
63 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
66 var validIdleBehavior = map[IdleBehavior]bool{
67 IdleBehaviorRun: true,
68 IdleBehaviorHold: true,
69 IdleBehaviorDrain: true,
73 logger logrus.FieldLogger
77 mtx sync.Locker // must be wp's Locker.
79 idleBehavior IdleBehavior
80 instance cloud.Instance
81 instType arvados.InstanceType
90 running map[string]struct{} // remember to update state idle<->running when this changes
91 starting map[string]struct{} // remember to update state idle<->running when this changes
95 // caller must have lock.
96 func (wkr *worker) startContainer(ctr arvados.Container) {
97 logger := wkr.logger.WithFields(logrus.Fields{
98 "ContainerUUID": ctr.UUID,
99 "Priority": ctr.Priority,
101 logger = logger.WithField("Instance", wkr.instance.ID())
102 logger.Debug("starting container")
103 wkr.starting[ctr.UUID] = struct{}{}
104 if wkr.state != StateRunning {
105 wkr.state = StateRunning
109 env := map[string]string{
110 "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
111 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
113 if wkr.wp.arvClient.Insecure {
114 env["ARVADOS_API_HOST_INSECURE"] = "1"
116 envJSON, err := json.Marshal(env)
120 stdin := bytes.NewBuffer(envJSON)
121 cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
122 if u := wkr.instance.RemoteUser(); u != "root" {
125 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
127 defer wkr.mtx.Unlock()
131 delete(wkr.starting, ctr.UUID)
132 wkr.running[ctr.UUID] = struct{}{}
133 wkr.lastUUID = ctr.UUID
135 logger.WithField("stdout", string(stdout)).
136 WithField("stderr", string(stderr)).
138 Error("error starting crunch-run process")
139 // Leave uuid in wkr.running, though: it's
140 // possible the error was just a communication
141 // failure and the process was in fact
142 // started. Wait for next probe to find out.
145 logger.Info("crunch-run process started")
146 wkr.lastUUID = ctr.UUID
150 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
151 // for the worker's curent state. If a previous probe is still
152 // running, it does nothing.
154 // It should be called in a new goroutine.
155 func (wkr *worker) ProbeAndUpdate() {
157 case wkr.probing <- struct{}{}:
161 wkr.logger.Debug("still waiting for last probe to finish")
165 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
166 // updates state accordingly.
168 // In StateUnknown: Call both probeBooted and probeRunning.
169 // In StateBooting: Call probeBooted; if successful, call probeRunning.
170 // In StateRunning: Call probeRunning.
171 // In StateIdle: Call probeRunning.
172 // In StateShutdown: Do nothing.
174 // If both probes succeed, wkr.state changes to
175 // StateIdle/StateRunning.
177 // If probeRunning succeeds, wkr.running is updated. (This means
178 // wkr.running might be non-empty even in StateUnknown, if the boot
181 // probeAndUpdate should be called in a new goroutine.
182 func (wkr *worker) probeAndUpdate() {
184 updated := wkr.updated
185 initialState := wkr.state
192 stderr []byte // from probeBooted
195 switch initialState {
198 case StateIdle, StateRunning:
200 case StateUnknown, StateBooting:
202 panic(fmt.Sprintf("unknown state %s", initialState))
205 probeStart := time.Now()
206 logger := wkr.logger.WithField("ProbeStart", probeStart)
209 booted, stderr = wkr.probeBooted()
211 // Pretend this probe succeeded if another
212 // concurrent attempt succeeded.
214 booted = wkr.state == StateRunning || wkr.state == StateIdle
218 logger.Info("instance booted; will try probeRunning")
221 if booted || wkr.state == StateUnknown {
222 ctrUUIDs, ok = wkr.probeRunning()
225 defer wkr.mtx.Unlock()
226 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
227 if wkr.state == StateShutdown && wkr.updated.After(updated) {
228 // Skip the logging noise if shutdown was
229 // initiated during probe.
232 // Using the start time of the probe as the timeout
233 // threshold ensures we always initiate at least one
234 // probe attempt after the boot/probe timeout expires
235 // (otherwise, a slow probe failure could cause us to
236 // shutdown an instance even though it did in fact
237 // boot/recover before the timeout expired).
238 dur := probeStart.Sub(wkr.probed)
239 if wkr.shutdownIfBroken(dur) {
240 // stderr from failed run-probes will have
241 // been logged already, but boot-probe
242 // failures are normal so they are logged only
243 // at Debug level. This is our chance to log
244 // some evidence about why the node never
245 // booted, even in non-debug mode.
247 logger.WithFields(logrus.Fields{
249 "stderr": string(stderr),
250 }).Info("boot failed")
256 updateTime := time.Now()
257 wkr.probed = updateTime
259 if updated != wkr.updated {
260 // Worker was updated after the probe began, so
261 // wkr.running might have a container UUID that was
262 // not yet running when ctrUUIDs was generated. Leave
263 // wkr.running alone and wait for the next probe to
264 // catch up on any changes.
268 if len(ctrUUIDs) > 0 {
269 wkr.busy = updateTime
270 wkr.lastUUID = ctrUUIDs[0]
271 } else if len(wkr.running) > 0 {
272 // Actual last-busy time was sometime between wkr.busy
273 // and now. Now is the earliest opportunity to take
274 // advantage of the non-busy state, though.
275 wkr.busy = updateTime
279 // Build a new "running" map. Set changed=true if it differs
280 // from the existing map (wkr.running) to ensure the scheduler
281 // gets notified below.
282 running := map[string]struct{}{}
283 for _, uuid := range ctrUUIDs {
284 running[uuid] = struct{}{}
285 if _, ok := wkr.running[uuid]; !ok {
286 if _, ok := wkr.starting[uuid]; !ok {
287 // We didn't start it -- it must have
288 // been started by a previous
289 // dispatcher process.
290 logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
295 for uuid := range wkr.running {
296 if _, ok := running[uuid]; !ok {
297 logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
298 wkr.wp.notifyExited(uuid, updateTime)
303 // Update state if this was the first successful boot-probe.
304 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
305 // Note: this will change again below if
306 // len(wkr.starting)+len(wkr.running) > 0.
307 wkr.state = StateIdle
311 // If wkr.state and wkr.running aren't changing then there's
312 // no need to log anything, notify the scheduler, move state
313 // back and forth between idle/running, etc.
318 // Log whenever a run-probe reveals crunch-run processes
319 // appearing/disappearing before boot-probe succeeds.
320 if wkr.state == StateUnknown && len(running) != len(wkr.running) {
321 logger.WithFields(logrus.Fields{
322 "RunningContainers": len(running),
324 }).Info("crunch-run probe succeeded, but boot probe is still failing")
327 wkr.running = running
328 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
329 wkr.state = StateRunning
330 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
331 wkr.state = StateIdle
333 wkr.updated = updateTime
334 if booted && (initialState == StateUnknown || initialState == StateBooting) {
335 logger.WithFields(logrus.Fields{
336 "RunningContainers": len(running),
338 }).Info("probes succeeded, instance is in service")
343 func (wkr *worker) probeRunning() (running []string, ok bool) {
344 cmd := "crunch-run --list"
345 if u := wkr.instance.RemoteUser(); u != "root" {
348 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
350 wkr.logger.WithFields(logrus.Fields{
352 "stdout": string(stdout),
353 "stderr": string(stderr),
354 }).WithError(err).Warn("probe failed")
357 stdout = bytes.TrimRight(stdout, "\n")
358 if len(stdout) == 0 {
361 return strings.Split(string(stdout), "\n"), true
364 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
365 cmd := wkr.wp.bootProbeCommand
369 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
370 logger := wkr.logger.WithFields(logrus.Fields{
372 "stdout": string(stdout),
373 "stderr": string(stderr),
376 logger.WithError(err).Debug("boot probe failed")
379 logger.Info("boot probe succeeded")
383 // caller must have lock.
384 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
385 if wkr.idleBehavior == IdleBehaviorHold {
389 label, threshold := "", wkr.wp.timeoutProbe
390 if wkr.state == StateUnknown || wkr.state == StateBooting {
391 label, threshold = "new ", wkr.wp.timeoutBooting
396 wkr.logger.WithFields(logrus.Fields{
400 }).Warnf("%sinstance unresponsive, shutting down", label)
405 // caller must have lock.
406 func (wkr *worker) shutdownIfIdle() bool {
407 if wkr.idleBehavior == IdleBehaviorHold {
411 age := time.Since(wkr.busy)
413 old := age >= wkr.wp.timeoutIdle
414 draining := wkr.idleBehavior == IdleBehaviorDrain
415 shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
416 (draining && wkr.state == StateBooting)
421 wkr.logger.WithFields(logrus.Fields{
423 "IdleDuration": stats.Duration(age),
424 "IdleBehavior": wkr.idleBehavior,
425 }).Info("shutdown idle worker")
430 // caller must have lock.
431 func (wkr *worker) shutdown() {
435 wkr.state = StateShutdown
438 err := wkr.instance.Destroy()
440 wkr.logger.WithError(err).Warn("shutdown failed")
446 // Save worker tags to cloud provider metadata, if they don't already
447 // match. Caller must have lock.
448 func (wkr *worker) saveTags() {
449 instance := wkr.instance
450 tags := instance.Tags()
451 update := cloud.InstanceTags{
452 tagKeyInstanceType: wkr.instType.Name,
453 tagKeyIdleBehavior: string(wkr.idleBehavior),
456 for k, v := range update {
464 err := instance.SetTags(tags)
466 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")