1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.curoverse.com/arvados.git/lib/cloud"
14 "git.curoverse.com/arvados.git/sdk/go/arvados"
15 "git.curoverse.com/arvados.git/sdk/go/stats"
16 "github.com/sirupsen/logrus"
21 maxPingFailTime = 10 * time.Minute
24 // State indicates whether a worker is available to do work, and (if
25 // not) whether/when it is expected to become ready.
29 StateUnknown State = iota // might be running a container already
30 StateBooting // instance is booting
31 StateIdle // instance booted, no containers are running
32 StateRunning // instance is running one or more containers
33 StateShutdown // worker has stopped monitoring the instance
36 var stateString = map[State]string{
37 StateUnknown: "unknown",
38 StateBooting: "booting",
40 StateRunning: "running",
41 StateShutdown: "shutdown",
44 // String implements fmt.Stringer.
45 func (s State) String() string {
49 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
50 // map[State]anything uses the state's string representation.
51 func (s State) MarshalText() ([]byte, error) {
52 return []byte(stateString[s]), nil
55 // IdleBehavior indicates the behavior desired when a node becomes idle.
56 type IdleBehavior string
59 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
60 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
61 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
64 var validIdleBehavior = map[IdleBehavior]bool{
65 IdleBehaviorRun: true,
66 IdleBehaviorHold: true,
67 IdleBehaviorDrain: true,
71 logger logrus.FieldLogger
75 mtx sync.Locker // must be wp's Locker.
77 idleBehavior IdleBehavior
78 instance cloud.Instance
79 instType arvados.InstanceType
88 running map[string]*remoteRunner // remember to update state idle<->running when this changes
89 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
93 func (wkr *worker) onUnkillable(uuid string) {
95 defer wkr.mtx.Unlock()
96 logger := wkr.logger.WithField("ContainerUUID", uuid)
97 if wkr.idleBehavior == IdleBehaviorHold {
98 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
101 logger.Warn("unkillable container, draining worker")
102 wkr.setIdleBehavior(IdleBehaviorDrain)
105 func (wkr *worker) onKilled(uuid string) {
107 defer wkr.mtx.Unlock()
108 wkr.closeRunner(uuid)
112 // caller must have lock.
113 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
114 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
115 wkr.idleBehavior = idleBehavior
120 // caller must have lock.
121 func (wkr *worker) startContainer(ctr arvados.Container) {
122 logger := wkr.logger.WithFields(logrus.Fields{
123 "ContainerUUID": ctr.UUID,
124 "Priority": ctr.Priority,
126 logger.Debug("starting container")
127 rr := newRemoteRunner(ctr.UUID, wkr)
128 wkr.starting[ctr.UUID] = rr
129 if wkr.state != StateRunning {
130 wkr.state = StateRunning
136 defer wkr.mtx.Unlock()
140 delete(wkr.starting, ctr.UUID)
141 wkr.running[ctr.UUID] = rr
142 wkr.lastUUID = ctr.UUID
146 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
147 // for the worker's curent state. If a previous probe is still
148 // running, it does nothing.
150 // It should be called in a new goroutine.
151 func (wkr *worker) ProbeAndUpdate() {
153 case wkr.probing <- struct{}{}:
157 wkr.logger.Debug("still waiting for last probe to finish")
161 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
162 // updates state accordingly.
164 // In StateUnknown: Call both probeBooted and probeRunning.
165 // In StateBooting: Call probeBooted; if successful, call probeRunning.
166 // In StateRunning: Call probeRunning.
167 // In StateIdle: Call probeRunning.
168 // In StateShutdown: Do nothing.
170 // If both probes succeed, wkr.state changes to
171 // StateIdle/StateRunning.
173 // If probeRunning succeeds, wkr.running is updated. (This means
174 // wkr.running might be non-empty even in StateUnknown, if the boot
177 // probeAndUpdate should be called in a new goroutine.
178 func (wkr *worker) probeAndUpdate() {
180 updated := wkr.updated
181 initialState := wkr.state
188 stderr []byte // from probeBooted
191 switch initialState {
194 case StateIdle, StateRunning:
196 case StateUnknown, StateBooting:
198 panic(fmt.Sprintf("unknown state %s", initialState))
201 probeStart := time.Now()
202 logger := wkr.logger.WithField("ProbeStart", probeStart)
205 booted, stderr = wkr.probeBooted()
207 // Pretend this probe succeeded if another
208 // concurrent attempt succeeded.
210 booted = wkr.state == StateRunning || wkr.state == StateIdle
214 logger.Info("instance booted; will try probeRunning")
217 reportedBroken := false
218 if booted || wkr.state == StateUnknown {
219 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
222 defer wkr.mtx.Unlock()
223 if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
224 logger.Info("probe reported broken instance")
225 wkr.setIdleBehavior(IdleBehaviorDrain)
227 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
228 if wkr.state == StateShutdown && wkr.updated.After(updated) {
229 // Skip the logging noise if shutdown was
230 // initiated during probe.
233 // Using the start time of the probe as the timeout
234 // threshold ensures we always initiate at least one
235 // probe attempt after the boot/probe timeout expires
236 // (otherwise, a slow probe failure could cause us to
237 // shutdown an instance even though it did in fact
238 // boot/recover before the timeout expired).
239 dur := probeStart.Sub(wkr.probed)
240 if wkr.shutdownIfBroken(dur) {
241 // stderr from failed run-probes will have
242 // been logged already, but boot-probe
243 // failures are normal so they are logged only
244 // at Debug level. This is our chance to log
245 // some evidence about why the node never
246 // booted, even in non-debug mode.
248 logger.WithFields(logrus.Fields{
250 "stderr": string(stderr),
251 }).Info("boot failed")
257 updateTime := time.Now()
258 wkr.probed = updateTime
260 if updated != wkr.updated {
261 // Worker was updated after the probe began, so
262 // wkr.running might have a container UUID that was
263 // not yet running when ctrUUIDs was generated. Leave
264 // wkr.running alone and wait for the next probe to
265 // catch up on any changes.
269 if len(ctrUUIDs) > 0 {
270 wkr.busy = updateTime
271 wkr.lastUUID = ctrUUIDs[0]
272 } else if len(wkr.running) > 0 {
273 // Actual last-busy time was sometime between wkr.busy
274 // and now. Now is the earliest opportunity to take
275 // advantage of the non-busy state, though.
276 wkr.busy = updateTime
279 changed := wkr.updateRunning(ctrUUIDs)
281 // Update state if this was the first successful boot-probe.
282 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
283 // Note: this will change again below if
284 // len(wkr.starting)+len(wkr.running) > 0.
285 wkr.state = StateIdle
289 // If wkr.state and wkr.running aren't changing then there's
290 // no need to log anything, notify the scheduler, move state
291 // back and forth between idle/running, etc.
296 // Log whenever a run-probe reveals crunch-run processes
297 // appearing/disappearing before boot-probe succeeds.
298 if wkr.state == StateUnknown && changed {
299 logger.WithFields(logrus.Fields{
300 "RunningContainers": len(wkr.running),
302 }).Info("crunch-run probe succeeded, but boot probe is still failing")
305 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
306 wkr.state = StateRunning
307 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
308 wkr.state = StateIdle
310 wkr.updated = updateTime
311 if booted && (initialState == StateUnknown || initialState == StateBooting) {
312 logger.WithFields(logrus.Fields{
313 "RunningContainers": len(wkr.running),
315 }).Info("probes succeeded, instance is in service")
320 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
321 cmd := "crunch-run --list"
322 if u := wkr.instance.RemoteUser(); u != "root" {
325 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
327 wkr.logger.WithFields(logrus.Fields{
329 "stdout": string(stdout),
330 "stderr": string(stderr),
331 }).WithError(err).Warn("probe failed")
335 for _, s := range strings.Split(string(stdout), "\n") {
339 running = append(running, s)
345 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
346 cmd := wkr.wp.bootProbeCommand
350 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
351 logger := wkr.logger.WithFields(logrus.Fields{
353 "stdout": string(stdout),
354 "stderr": string(stderr),
357 logger.WithError(err).Debug("boot probe failed")
360 logger.Info("boot probe succeeded")
364 // caller must have lock.
365 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
366 if wkr.idleBehavior == IdleBehaviorHold {
370 label, threshold := "", wkr.wp.timeoutProbe
371 if wkr.state == StateUnknown || wkr.state == StateBooting {
372 label, threshold = "new ", wkr.wp.timeoutBooting
377 wkr.logger.WithFields(logrus.Fields{
381 }).Warnf("%sinstance unresponsive, shutting down", label)
386 // Returns true if the instance is eligible for shutdown: either it's
387 // been idle too long, or idleBehavior=Drain and nothing is running.
389 // caller must have lock.
390 func (wkr *worker) eligibleForShutdown() bool {
391 if wkr.idleBehavior == IdleBehaviorHold {
394 draining := wkr.idleBehavior == IdleBehaviorDrain
399 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
404 for _, rr := range wkr.running {
409 for _, rr := range wkr.starting {
414 // draining, and all remaining runners are just trying
415 // to force-kill their crunch-run procs
422 // caller must have lock.
423 func (wkr *worker) shutdownIfIdle() bool {
424 if !wkr.eligibleForShutdown() {
427 wkr.logger.WithFields(logrus.Fields{
429 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
430 "IdleBehavior": wkr.idleBehavior,
431 }).Info("shutdown worker")
436 // caller must have lock.
437 func (wkr *worker) shutdown() {
441 wkr.state = StateShutdown
444 err := wkr.instance.Destroy()
446 wkr.logger.WithError(err).Warn("shutdown failed")
452 // Save worker tags to cloud provider metadata, if they don't already
453 // match. Caller must have lock.
454 func (wkr *worker) saveTags() {
455 instance := wkr.instance
456 tags := instance.Tags()
457 update := cloud.InstanceTags{
458 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
459 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
462 for k, v := range update {
470 err := instance.SetTags(tags)
472 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
478 func (wkr *worker) Close() {
479 // This might take time, so do it after unlocking mtx.
480 defer wkr.executor.Close()
483 defer wkr.mtx.Unlock()
484 for uuid, rr := range wkr.running {
485 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
488 for uuid, rr := range wkr.starting {
489 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
494 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
495 // probe. Returns true if anything was added or removed.
497 // Caller must have lock.
498 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
499 alive := map[string]bool{}
500 for _, uuid := range ctrUUIDs {
502 if _, ok := wkr.running[uuid]; ok {
504 } else if rr, ok := wkr.starting[uuid]; ok {
505 wkr.running[uuid] = rr
506 delete(wkr.starting, uuid)
509 // We didn't start it -- it must have been
510 // started by a previous dispatcher process.
511 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
512 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
516 for uuid := range wkr.running {
518 wkr.closeRunner(uuid)
525 // caller must have lock.
526 func (wkr *worker) closeRunner(uuid string) {
527 rr := wkr.running[uuid]
531 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
532 delete(wkr.running, uuid)
537 wkr.wp.exited[uuid] = now
538 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
539 wkr.state = StateIdle