1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/stats"
23 "github.com/sirupsen/logrus"
24 "golang.org/x/crypto/ssh"
29 maxPingFailTime = 10 * time.Minute
32 // State indicates whether a worker is available to do work, and (if
33 // not) whether/when it is expected to become ready.
37 StateUnknown State = iota // might be running a container already
38 StateBooting // instance is booting
39 StateIdle // instance booted, no containers are running
40 StateRunning // instance is running one or more containers
41 StateShutdown // worker has stopped monitoring the instance
44 var stateString = map[State]string{
45 StateUnknown: "unknown",
46 StateBooting: "booting",
48 StateRunning: "running",
49 StateShutdown: "shutdown",
52 // String implements fmt.Stringer.
53 func (s State) String() string {
57 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
58 // map[State]anything uses the state's string representation.
59 func (s State) MarshalText() ([]byte, error) {
60 return []byte(stateString[s]), nil
63 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
64 type BootOutcome string
67 BootOutcomeFailed BootOutcome = "failure"
68 BootOutcomeSucceeded BootOutcome = "success"
69 BootOutcomeAborted BootOutcome = "aborted"
70 BootOutcomeDisappeared BootOutcome = "disappeared"
73 var validBootOutcomes = map[BootOutcome]bool{
74 BootOutcomeFailed: true,
75 BootOutcomeSucceeded: true,
76 BootOutcomeAborted: true,
77 BootOutcomeDisappeared: true,
80 // IdleBehavior indicates the behavior desired when a node becomes idle.
81 type IdleBehavior string
84 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
85 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
86 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
89 var validIdleBehavior = map[IdleBehavior]bool{
90 IdleBehaviorRun: true,
91 IdleBehaviorHold: true,
92 IdleBehaviorDrain: true,
96 logger logrus.FieldLogger
100 mtx sync.Locker // must be wp's Locker.
102 idleBehavior IdleBehavior
103 instance cloud.Instance
104 instType arvados.InstanceType
112 firstSSHConnection time.Time
114 running map[string]*remoteRunner // remember to update state idle<->running when this changes
115 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
116 probing chan struct{}
117 bootOutcomeReported bool
118 timeToReadyReported bool
119 staleRunLockSince time.Time
122 func (wkr *worker) onUnkillable(uuid string) {
124 defer wkr.mtx.Unlock()
125 logger := wkr.logger.WithField("ContainerUUID", uuid)
126 if wkr.idleBehavior == IdleBehaviorHold {
127 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
130 logger.Warn("unkillable container, draining worker")
131 wkr.setIdleBehavior(IdleBehaviorDrain)
134 func (wkr *worker) onKilled(uuid string) {
136 defer wkr.mtx.Unlock()
137 wkr.closeRunner(uuid)
141 // caller must have lock.
142 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
143 if wkr.bootOutcomeReported {
146 if wkr.wp.mBootOutcomes != nil {
147 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
149 wkr.bootOutcomeReported = true
152 // caller must have lock.
153 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
154 if wkr.timeToReadyReported {
157 if wkr.wp.mTimeToSSH != nil {
158 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
160 wkr.timeToReadyReported = true
163 // caller must have lock.
164 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
165 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
166 wkr.idleBehavior = idleBehavior
171 // caller must have lock.
172 func (wkr *worker) startContainer(ctr arvados.Container) {
173 logger := wkr.logger.WithFields(logrus.Fields{
174 "ContainerUUID": ctr.UUID,
175 "Priority": ctr.Priority,
177 logger.Debug("starting container")
178 rr := newRemoteRunner(ctr.UUID, wkr)
179 wkr.starting[ctr.UUID] = rr
180 if wkr.state != StateRunning {
181 wkr.state = StateRunning
186 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
187 wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
190 defer wkr.mtx.Unlock()
194 delete(wkr.starting, ctr.UUID)
195 wkr.running[ctr.UUID] = rr
196 wkr.lastUUID = ctr.UUID
200 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
201 // for the worker's current state. If a previous probe is still
202 // running, it does nothing.
204 // It should be called in a new goroutine.
205 func (wkr *worker) ProbeAndUpdate() {
207 case wkr.probing <- struct{}{}:
211 wkr.logger.Debug("still waiting for last probe to finish")
215 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
216 // updates state accordingly.
218 // In StateUnknown: Call both probeBooted and probeRunning.
219 // In StateBooting: Call probeBooted; if successful, call probeRunning.
220 // In StateRunning: Call probeRunning.
221 // In StateIdle: Call probeRunning.
222 // In StateShutdown: Do nothing.
224 // If both probes succeed, wkr.state changes to
225 // StateIdle/StateRunning.
227 // If probeRunning succeeds, wkr.running is updated. (This means
228 // wkr.running might be non-empty even in StateUnknown, if the boot
231 // probeAndUpdate should be called in a new goroutine.
232 func (wkr *worker) probeAndUpdate() {
234 updated := wkr.updated
235 initialState := wkr.state
242 stderr []byte // from probeBooted
243 errLast error // from probeBooted or copyRunnerData
246 switch initialState {
249 case StateIdle, StateRunning:
251 case StateUnknown, StateBooting:
253 panic(fmt.Sprintf("unknown state %s", initialState))
256 probeStart := time.Now()
257 logger := wkr.logger.WithField("ProbeStart", probeStart)
260 stderr, errLast = wkr.probeBooted()
261 booted = errLast == nil
262 shouldCopy := booted || initialState == StateUnknown
264 // Pretend this probe succeeded if another
265 // concurrent attempt succeeded.
267 if wkr.state == StateRunning || wkr.state == StateIdle {
274 _, stderrCopy, err := wkr.copyRunnerData()
277 wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
282 logger.Info("instance booted; will try probeRunning")
285 reportedBroken := false
286 if booted || initialState == StateUnknown {
287 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
290 defer wkr.mtx.Unlock()
291 if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
292 logger.Info("probe reported broken instance")
293 wkr.reportBootOutcome(BootOutcomeFailed)
294 wkr.setIdleBehavior(IdleBehaviorDrain)
296 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
297 if wkr.state == StateShutdown && wkr.updated.After(updated) {
298 // Skip the logging noise if shutdown was
299 // initiated during probe.
302 // Using the start time of the probe as the timeout
303 // threshold ensures we always initiate at least one
304 // probe attempt after the boot/probe timeout expires
305 // (otherwise, a slow probe failure could cause us to
306 // shutdown an instance even though it did in fact
307 // boot/recover before the timeout expired).
308 dur := probeStart.Sub(wkr.probed)
309 if wkr.shutdownIfBroken(dur) {
310 // stderr from failed run-probes will have
311 // been logged already, but some boot-probe
312 // failures are normal so they are logged only
313 // at Debug level. This may be our chance to
314 // log some evidence about why the node never
315 // booted, even in non-debug mode.
317 wkr.reportBootOutcome(BootOutcomeFailed)
318 logger.WithFields(logrus.Fields{
320 "stderr": string(stderr),
321 }).WithError(errLast).Info("boot failed")
327 updateTime := time.Now()
328 wkr.probed = updateTime
330 if updated != wkr.updated {
331 // Worker was updated after the probe began, so
332 // wkr.running might have a container UUID that was
333 // not yet running when ctrUUIDs was generated. Leave
334 // wkr.running alone and wait for the next probe to
335 // catch up on any changes.
336 logger.WithFields(logrus.Fields{
338 "wkr.updated": wkr.updated,
339 }).Debug("skipping worker state update due to probe/sync race")
343 if len(ctrUUIDs) > 0 {
344 wkr.busy = updateTime
345 wkr.lastUUID = ctrUUIDs[0]
346 } else if len(wkr.running) > 0 {
347 // Actual last-busy time was sometime between wkr.busy
348 // and now. Now is the earliest opportunity to take
349 // advantage of the non-busy state, though.
350 wkr.busy = updateTime
353 changed := wkr.updateRunning(ctrUUIDs)
355 // Update state if this was the first successful boot-probe.
356 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
357 if wkr.state == StateBooting {
358 wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
360 // Note: this will change again below if
361 // len(wkr.starting)+len(wkr.running) > 0.
362 wkr.state = StateIdle
366 // If wkr.state and wkr.running aren't changing then there's
367 // no need to log anything, notify the scheduler, move state
368 // back and forth between idle/running, etc.
373 // Log whenever a run-probe reveals crunch-run processes
374 // appearing/disappearing before boot-probe succeeds.
375 if wkr.state == StateUnknown && changed {
376 logger.WithFields(logrus.Fields{
377 "RunningContainers": len(wkr.running),
379 }).Info("crunch-run probe succeeded, but boot probe is still failing")
382 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
383 wkr.state = StateRunning
384 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
385 wkr.state = StateIdle
387 wkr.updated = updateTime
388 if booted && (initialState == StateUnknown || initialState == StateBooting) {
389 wkr.reportBootOutcome(BootOutcomeSucceeded)
390 logger.WithFields(logrus.Fields{
391 "RunningContainers": len(wkr.running),
393 }).Info("probes succeeded, instance is in service")
398 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
399 cmd := wkr.wp.runnerCmd + " --list"
400 if u := wkr.instance.RemoteUser(); u != "root" {
405 if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
406 j, _ := json.Marshal(prices)
407 stdin = bytes.NewReader(j)
409 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
411 wkr.logger.WithFields(logrus.Fields{
413 "stdout": string(stdout),
414 "stderr": string(stderr),
415 }).WithError(err).Warn("probe failed")
416 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
419 wkr.logger.WithFields(logrus.Fields{
421 "stdout": string(stdout),
422 "stderr": string(stderr),
423 }).Debug("probe succeeded")
424 wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
427 staleRunLock := false
428 for _, s := range strings.Split(string(stdout), "\n") {
429 // Each line of the "crunch-run --list" output is one
432 // * a container UUID, indicating that processes
433 // related to that container are currently running.
434 // Optionally followed by " stale", indicating that
435 // the crunch-run process itself has exited (the
436 // remaining process is probably arv-mount).
438 // * the string "broken", indicating that the instance
439 // appears incapable of starting containers.
441 // See ListProcesses() in lib/crunchrun/background.go.
443 // empty string following final newline
444 } else if s == "broken" {
446 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
447 // Ignore crunch-run processes that belong to
448 // a different cluster (e.g., a single host
449 // running multiple clusters with the loopback
452 } else if toks := strings.Split(s, " "); len(toks) == 1 {
453 running = append(running, s)
454 } else if toks[1] == "stale" {
455 wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
460 defer wkr.mtx.Unlock()
462 wkr.staleRunLockSince = time.Time{}
463 } else if wkr.staleRunLockSince.IsZero() {
464 wkr.staleRunLockSince = time.Now()
465 } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
466 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
472 func (wkr *worker) probeBooted() (stderr []byte, err error) {
473 cmd := wkr.wp.bootProbeCommand
477 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
478 logger := wkr.logger.WithFields(logrus.Fields{
480 "stdout": string(stdout),
481 "stderr": string(stderr),
484 if errors.Is(err, sshexecutor.ErrNoAddress) ||
485 errors.As(err, new(*net.OpError)) ||
486 errors.As(err, new(*ssh.ExitError)) {
487 // These errors are expected while the
488 // instance is booting, so we only log them at
490 logger.WithError(err).Debug("boot probe failed")
492 // Other errors are more likely to indicate a
493 // configuration problem, and it's more
494 // sysadmin-friendly to show them right away
495 // instead of waiting until boot timeout and
496 // only showing the last error.
498 // Example: "ssh: handshake failed: ssh:
499 // unable to authenticate, attempted methods
500 // [none publickey], no supported methods
502 logger.WithError(err).Warn("boot probe failed")
506 logger.Info("boot probe succeeded")
510 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
511 if err = wkr.wp.loadRunnerData(); err != nil {
512 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
514 } else if len(wkr.wp.runnerData) == 0 {
515 // Assume crunch-run is already installed
519 hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
520 dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
521 logger := wkr.logger.WithFields(logrus.Fields{
523 "path": wkr.wp.runnerCmd,
526 stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
527 if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
528 logger.Info("runner binary already exists on worker, with correct hash")
532 // Note touch+chmod come before writing data, to avoid the
533 // possibility of md5 being correct while file mode is
535 cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
536 if wkr.instance.RemoteUser() != "root" {
537 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
539 logger.WithField("cmd", cmd).Info("installing runner binary on worker")
540 stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
544 // caller must have lock.
545 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
546 if wkr.idleBehavior == IdleBehaviorHold {
550 label, threshold := "", wkr.wp.timeoutProbe
551 if wkr.state == StateUnknown || wkr.state == StateBooting {
552 label, threshold = "new ", wkr.wp.timeoutBooting
557 wkr.logger.WithFields(logrus.Fields{
561 }).Warnf("%sinstance unresponsive, shutting down", label)
566 // Returns true if the instance is eligible for shutdown: either it's
567 // been idle too long, or idleBehavior=Drain and nothing is running.
569 // caller must have lock.
570 func (wkr *worker) eligibleForShutdown() bool {
571 if wkr.idleBehavior == IdleBehaviorHold {
574 draining := wkr.idleBehavior == IdleBehaviorDrain
579 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
584 for _, rr := range wkr.running {
589 for _, rr := range wkr.starting {
594 // draining, and all remaining runners are just trying
595 // to force-kill their crunch-run procs
602 // caller must have lock.
603 func (wkr *worker) shutdownIfIdle() bool {
604 if !wkr.eligibleForShutdown() {
607 wkr.logger.WithFields(logrus.Fields{
609 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
610 "IdleBehavior": wkr.idleBehavior,
611 }).Info("shutdown worker")
612 wkr.reportBootOutcome(BootOutcomeAborted)
617 // caller must have lock.
618 func (wkr *worker) shutdown() {
622 wkr.state = StateShutdown
625 err := wkr.instance.Destroy()
627 wkr.logger.WithError(err).Warn("shutdown failed")
633 // Save worker tags to cloud provider metadata, if they don't already
634 // match. Caller must have lock.
635 func (wkr *worker) saveTags() {
636 instance := wkr.instance
637 tags := instance.Tags()
638 update := cloud.InstanceTags{
639 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
640 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
643 for k, v := range update {
651 err := instance.SetTags(tags)
653 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
659 func (wkr *worker) Close() {
660 // This might take time, so do it after unlocking mtx.
661 defer wkr.executor.Close()
664 defer wkr.mtx.Unlock()
665 for uuid, rr := range wkr.running {
666 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
669 for uuid, rr := range wkr.starting {
670 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
675 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
676 // probe. Returns true if anything was added or removed.
678 // Caller must have lock.
679 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
680 alive := map[string]bool{}
681 for _, uuid := range ctrUUIDs {
683 if _, ok := wkr.running[uuid]; ok {
685 } else if rr, ok := wkr.starting[uuid]; ok {
686 wkr.running[uuid] = rr
687 delete(wkr.starting, uuid)
690 // We didn't start it -- it must have been
691 // started by a previous dispatcher process.
692 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
693 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
697 for uuid := range wkr.running {
699 wkr.closeRunner(uuid)
706 // caller must have lock.
707 func (wkr *worker) closeRunner(uuid string) {
708 rr := wkr.running[uuid]
712 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
713 delete(wkr.running, uuid)
718 wkr.wp.exited[uuid] = now
719 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
720 wkr.state = StateIdle