1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/lib/cloud"
20 "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/stats"
23 "github.com/sirupsen/logrus"
24 "golang.org/x/crypto/ssh"
29 maxPingFailTime = 10 * time.Minute
32 // State indicates whether a worker is available to do work, and (if
33 // not) whether/when it is expected to become ready.
37 StateUnknown State = iota // might be running a container already
38 StateBooting // instance is booting
39 StateIdle // instance booted, no containers are running
40 StateRunning // instance is running one or more containers
41 StateShutdown // worker has stopped monitoring the instance
44 var stateString = map[State]string{
45 StateUnknown: "unknown",
46 StateBooting: "booting",
48 StateRunning: "running",
49 StateShutdown: "shutdown",
52 // String implements fmt.Stringer.
53 func (s State) String() string {
57 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
58 // map[State]anything uses the state's string representation.
59 func (s State) MarshalText() ([]byte, error) {
60 return []byte(stateString[s]), nil
63 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
64 type BootOutcome string
67 BootOutcomeFailed BootOutcome = "failure"
68 BootOutcomeSucceeded BootOutcome = "success"
69 BootOutcomeAborted BootOutcome = "aborted"
70 BootOutcomeDisappeared BootOutcome = "disappeared"
73 var validBootOutcomes = map[BootOutcome]bool{
74 BootOutcomeFailed: true,
75 BootOutcomeSucceeded: true,
76 BootOutcomeAborted: true,
77 BootOutcomeDisappeared: true,
80 // IdleBehavior indicates the behavior desired when a node becomes idle.
81 type IdleBehavior string
84 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
85 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
86 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
89 var validIdleBehavior = map[IdleBehavior]bool{
90 IdleBehaviorRun: true,
91 IdleBehaviorHold: true,
92 IdleBehaviorDrain: true,
96 logger logrus.FieldLogger
100 mtx sync.Locker // must be wp's Locker.
102 idleBehavior IdleBehavior
103 instance cloud.Instance
104 instType arvados.InstanceType
112 firstSSHConnection time.Time
114 running map[string]*remoteRunner // remember to update state idle<->running when this changes
115 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
116 probing chan struct{}
117 bootOutcomeReported bool
118 timeToReadyReported bool
119 staleRunLockSince time.Time
122 func (wkr *worker) onUnkillable(uuid string) {
124 defer wkr.mtx.Unlock()
125 logger := wkr.logger.WithField("ContainerUUID", uuid)
126 if wkr.idleBehavior == IdleBehaviorHold {
127 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
130 logger.Warn("unkillable container, draining worker")
131 wkr.setIdleBehavior(IdleBehaviorDrain)
134 func (wkr *worker) onKilled(uuid string) {
136 defer wkr.mtx.Unlock()
137 wkr.closeRunner(uuid)
141 // caller must have lock.
142 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
143 if wkr.bootOutcomeReported {
146 if wkr.wp.mBootOutcomes != nil {
147 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
149 wkr.bootOutcomeReported = true
152 // caller must have lock.
153 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
154 if wkr.timeToReadyReported {
157 if wkr.wp.mTimeToSSH != nil {
158 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
160 wkr.timeToReadyReported = true
163 // caller must have lock.
164 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
165 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
166 wkr.idleBehavior = idleBehavior
171 // caller must have lock.
172 func (wkr *worker) startContainer(ctr arvados.Container) {
173 logger := wkr.logger.WithFields(logrus.Fields{
174 "ContainerUUID": ctr.UUID,
175 "Priority": ctr.Priority,
177 logger.Debug("starting container")
178 rr := newRemoteRunner(ctr.UUID, wkr)
179 wkr.starting[ctr.UUID] = rr
180 if wkr.state != StateRunning {
181 wkr.state = StateRunning
186 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
187 wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
190 defer wkr.mtx.Unlock()
191 if wkr.starting[ctr.UUID] != rr {
192 // Someone else (e.g., wkr.probeAndUpdate() ->
193 // wkr.updateRunning() or wkr.Close()) already
194 // moved our runner from wkr.starting to
195 // wkr.running or deleted it while we were in
202 delete(wkr.starting, ctr.UUID)
203 wkr.running[ctr.UUID] = rr
204 wkr.lastUUID = ctr.UUID
208 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
209 // for the worker's current state. If a previous probe is still
210 // running, it does nothing.
212 // It should be called in a new goroutine.
213 func (wkr *worker) ProbeAndUpdate() {
215 case wkr.probing <- struct{}{}:
219 wkr.logger.Debug("still waiting for last probe to finish")
223 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
224 // updates state accordingly.
226 // In StateUnknown: Call both probeBooted and probeRunning.
227 // In StateBooting: Call probeBooted; if successful, call probeRunning.
228 // In StateRunning: Call probeRunning.
229 // In StateIdle: Call probeRunning.
230 // In StateShutdown: Do nothing.
232 // If both probes succeed, wkr.state changes to
233 // StateIdle/StateRunning.
235 // If probeRunning succeeds, wkr.running is updated. (This means
236 // wkr.running might be non-empty even in StateUnknown, if the boot
239 // probeAndUpdate should be called in a new goroutine.
240 func (wkr *worker) probeAndUpdate() {
242 updated := wkr.updated
243 initialState := wkr.state
250 stderr []byte // from probeBooted
251 errLast error // from probeBooted or copyRunnerData
254 switch initialState {
257 case StateIdle, StateRunning:
259 case StateUnknown, StateBooting:
261 panic(fmt.Sprintf("unknown state %s", initialState))
264 probeStart := time.Now()
265 logger := wkr.logger.WithField("ProbeStart", probeStart)
268 stderr, errLast = wkr.probeBooted()
269 booted = errLast == nil
270 shouldCopy := booted || initialState == StateUnknown
272 // Pretend this probe succeeded if another
273 // concurrent attempt succeeded.
275 if wkr.state == StateRunning || wkr.state == StateIdle {
282 _, stderrCopy, err := wkr.copyRunnerData()
285 wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
290 logger.Info("instance booted; will try probeRunning")
293 reportedBroken := false
294 if booted || initialState == StateUnknown {
295 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
298 defer wkr.mtx.Unlock()
299 if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
300 logger.Info("probe reported broken instance")
301 wkr.reportBootOutcome(BootOutcomeFailed)
302 wkr.setIdleBehavior(IdleBehaviorDrain)
304 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
305 if wkr.state == StateShutdown && wkr.updated.After(updated) {
306 // Skip the logging noise if shutdown was
307 // initiated during probe.
310 // Using the start time of the probe as the timeout
311 // threshold ensures we always initiate at least one
312 // probe attempt after the boot/probe timeout expires
313 // (otherwise, a slow probe failure could cause us to
314 // shutdown an instance even though it did in fact
315 // boot/recover before the timeout expired).
316 dur := probeStart.Sub(wkr.probed)
317 if wkr.shutdownIfBroken(dur) {
318 // stderr from failed run-probes will have
319 // been logged already, but some boot-probe
320 // failures are normal so they are logged only
321 // at Debug level. This may be our chance to
322 // log some evidence about why the node never
323 // booted, even in non-debug mode.
325 wkr.reportBootOutcome(BootOutcomeFailed)
326 logger.WithFields(logrus.Fields{
328 "stderr": string(stderr),
329 }).WithError(errLast).Info("boot failed")
335 updateTime := time.Now()
336 wkr.probed = updateTime
338 if updated != wkr.updated {
339 // Worker was updated after the probe began, so
340 // wkr.running might have a container UUID that was
341 // not yet running when ctrUUIDs was generated. Leave
342 // wkr.running alone and wait for the next probe to
343 // catch up on any changes.
344 logger.WithFields(logrus.Fields{
346 "wkr.updated": wkr.updated,
347 }).Debug("skipping worker state update due to probe/sync race")
351 if len(ctrUUIDs) > 0 {
352 wkr.busy = updateTime
353 wkr.lastUUID = ctrUUIDs[0]
354 } else if len(wkr.running) > 0 {
355 // Actual last-busy time was sometime between wkr.busy
356 // and now. Now is the earliest opportunity to take
357 // advantage of the non-busy state, though.
358 wkr.busy = updateTime
361 changed := wkr.updateRunning(ctrUUIDs)
363 // Update state if this was the first successful boot-probe.
364 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
365 if wkr.state == StateBooting {
366 wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
368 // Note: this will change again below if
369 // len(wkr.starting)+len(wkr.running) > 0.
370 wkr.state = StateIdle
374 // If wkr.state and wkr.running aren't changing then there's
375 // no need to log anything, notify the scheduler, move state
376 // back and forth between idle/running, etc.
381 // Log whenever a run-probe reveals crunch-run processes
382 // appearing/disappearing before boot-probe succeeds.
383 if wkr.state == StateUnknown && changed {
384 logger.WithFields(logrus.Fields{
385 "RunningContainers": len(wkr.running),
387 }).Info("crunch-run probe succeeded, but boot probe is still failing")
390 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
391 wkr.state = StateRunning
392 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
393 wkr.state = StateIdle
395 wkr.updated = updateTime
396 if booted && (initialState == StateUnknown || initialState == StateBooting) {
397 wkr.reportBootOutcome(BootOutcomeSucceeded)
398 logger.WithFields(logrus.Fields{
399 "RunningContainers": len(wkr.running),
401 }).Info("probes succeeded, instance is in service")
406 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
407 cmd := wkr.wp.runnerCmd + " --list"
408 if u := wkr.instance.RemoteUser(); u != "root" {
413 if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
414 j, _ := json.Marshal(prices)
415 stdin = bytes.NewReader(j)
417 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
419 wkr.logger.WithFields(logrus.Fields{
421 "stdout": string(stdout),
422 "stderr": string(stderr),
423 }).WithError(err).Warn("probe failed")
424 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
427 wkr.logger.WithFields(logrus.Fields{
429 "stdout": string(stdout),
430 "stderr": string(stderr),
431 }).Debug("probe succeeded")
432 wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
435 staleRunLock := false
436 for _, s := range strings.Split(string(stdout), "\n") {
437 // Each line of the "crunch-run --list" output is one
440 // * a container UUID, indicating that processes
441 // related to that container are currently running.
442 // Optionally followed by " stale", indicating that
443 // the crunch-run process itself has exited (the
444 // remaining process is probably arv-mount).
446 // * the string "broken", indicating that the instance
447 // appears incapable of starting containers.
449 // See ListProcesses() in lib/crunchrun/background.go.
451 // empty string following final newline
452 } else if s == "broken" {
454 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
455 // Ignore crunch-run processes that belong to
456 // a different cluster (e.g., a single host
457 // running multiple clusters with the loopback
460 } else if toks := strings.Split(s, " "); len(toks) == 1 {
461 running = append(running, s)
462 } else if toks[1] == "stale" {
463 wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
468 defer wkr.mtx.Unlock()
470 wkr.staleRunLockSince = time.Time{}
471 } else if wkr.staleRunLockSince.IsZero() {
472 wkr.staleRunLockSince = time.Now()
473 } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
474 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
480 func (wkr *worker) probeBooted() (stderr []byte, err error) {
481 cmd := wkr.wp.bootProbeCommand
485 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
486 logger := wkr.logger.WithFields(logrus.Fields{
488 "stdout": string(stdout),
489 "stderr": string(stderr),
492 if errors.Is(err, sshexecutor.ErrNoAddress) ||
493 errors.As(err, new(*net.OpError)) ||
494 errors.As(err, new(*ssh.ExitError)) {
495 // These errors are expected while the
496 // instance is booting, so we only log them at
498 logger.WithError(err).Debug("boot probe failed")
500 // Other errors are more likely to indicate a
501 // configuration problem, and it's more
502 // sysadmin-friendly to show them right away
503 // instead of waiting until boot timeout and
504 // only showing the last error.
506 // Example: "ssh: handshake failed: ssh:
507 // unable to authenticate, attempted methods
508 // [none publickey], no supported methods
510 logger.WithError(err).Warn("boot probe failed")
514 logger.Info("boot probe succeeded")
518 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
519 if err = wkr.wp.loadRunnerData(); err != nil {
520 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
522 } else if len(wkr.wp.runnerData) == 0 {
523 // Assume crunch-run is already installed
527 hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
528 dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
529 logger := wkr.logger.WithFields(logrus.Fields{
531 "path": wkr.wp.runnerCmd,
534 stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
535 if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
536 logger.Info("runner binary already exists on worker, with correct hash")
540 // Note touch+chmod come before writing data, to avoid the
541 // possibility of md5 being correct while file mode is
543 cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
544 if wkr.instance.RemoteUser() != "root" {
545 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
547 logger.WithField("cmd", cmd).Info("installing runner binary on worker")
548 stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
552 // caller must have lock.
553 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
554 if wkr.idleBehavior == IdleBehaviorHold {
558 prologue, epilogue, threshold := "", "", wkr.wp.timeoutProbe
559 if wkr.state == StateUnknown || wkr.state == StateBooting {
561 epilogue = " -- `arvados-server cloudtest` might help troubleshoot, see https://doc.arvados.org/main/admin/cloudtest.html"
562 threshold = wkr.wp.timeoutBooting
567 wkr.logger.WithFields(logrus.Fields{
571 }).Warnf("%sinstance unresponsive, shutting down%s", prologue, epilogue)
576 // Returns true if the instance is eligible for shutdown: either it's
577 // been idle too long, or idleBehavior=Drain and nothing is running.
579 // caller must have lock.
580 func (wkr *worker) eligibleForShutdown() bool {
581 if wkr.idleBehavior == IdleBehaviorHold {
584 draining := wkr.idleBehavior == IdleBehaviorDrain
589 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
594 for _, rr := range wkr.running {
599 for _, rr := range wkr.starting {
604 // draining, and all remaining runners are just trying
605 // to force-kill their crunch-run procs
612 // caller must have lock.
613 func (wkr *worker) shutdownIfIdle() bool {
614 if !wkr.eligibleForShutdown() {
617 wkr.logger.WithFields(logrus.Fields{
619 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
620 "IdleBehavior": wkr.idleBehavior,
621 }).Info("shutdown worker")
622 wkr.reportBootOutcome(BootOutcomeAborted)
627 // caller must have lock.
628 func (wkr *worker) shutdown() {
632 wkr.state = StateShutdown
635 err := wkr.instance.Destroy()
637 wkr.logger.WithError(err).Warn("shutdown failed")
643 // Save worker tags to cloud provider metadata, if they don't already
644 // match. Caller must have lock.
645 func (wkr *worker) saveTags() {
646 instance := wkr.instance
647 tags := instance.Tags()
648 update := cloud.InstanceTags{
649 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
650 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
653 for k, v := range update {
661 err := instance.SetTags(tags)
663 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
669 func (wkr *worker) Close() {
670 // This might take time, so do it after unlocking mtx.
671 defer wkr.executor.Close()
674 defer wkr.mtx.Unlock()
675 for uuid, rr := range wkr.running {
676 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
678 delete(wkr.running, uuid)
680 for uuid, rr := range wkr.starting {
681 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
683 delete(wkr.starting, uuid)
687 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
688 // probe. Returns true if anything was added or removed.
690 // Caller must have lock.
691 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
692 alive := map[string]bool{}
693 for _, uuid := range ctrUUIDs {
695 if _, ok := wkr.running[uuid]; ok {
697 } else if rr, ok := wkr.starting[uuid]; ok {
698 wkr.running[uuid] = rr
699 delete(wkr.starting, uuid)
702 // We didn't start it -- it must have been
703 // started by a previous dispatcher process.
704 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
705 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
709 for uuid := range wkr.running {
711 wkr.closeRunner(uuid)
718 // caller must have lock.
719 func (wkr *worker) closeRunner(uuid string) {
720 rr := wkr.running[uuid]
724 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
725 delete(wkr.running, uuid)
730 wkr.wp.exited[uuid] = now
731 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
732 wkr.state = StateIdle