1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/lib/cloud"
18 "git.arvados.org/arvados.git/sdk/go/arvados"
19 "git.arvados.org/arvados.git/sdk/go/stats"
20 "github.com/sirupsen/logrus"
25 maxPingFailTime = 10 * time.Minute
28 // State indicates whether a worker is available to do work, and (if
29 // not) whether/when it is expected to become ready.
33 StateUnknown State = iota // might be running a container already
34 StateBooting // instance is booting
35 StateIdle // instance booted, no containers are running
36 StateRunning // instance is running one or more containers
37 StateShutdown // worker has stopped monitoring the instance
40 var stateString = map[State]string{
41 StateUnknown: "unknown",
42 StateBooting: "booting",
44 StateRunning: "running",
45 StateShutdown: "shutdown",
48 // String implements fmt.Stringer.
49 func (s State) String() string {
53 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
54 // map[State]anything uses the state's string representation.
55 func (s State) MarshalText() ([]byte, error) {
56 return []byte(stateString[s]), nil
59 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
60 type BootOutcome string
63 BootOutcomeFailed BootOutcome = "failure"
64 BootOutcomeSucceeded BootOutcome = "success"
65 BootOutcomeAborted BootOutcome = "aborted"
66 BootOutcomeDisappeared BootOutcome = "disappeared"
69 var validBootOutcomes = map[BootOutcome]bool{
70 BootOutcomeFailed: true,
71 BootOutcomeSucceeded: true,
72 BootOutcomeAborted: true,
73 BootOutcomeDisappeared: true,
76 // IdleBehavior indicates the behavior desired when a node becomes idle.
77 type IdleBehavior string
80 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
81 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
82 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
85 var validIdleBehavior = map[IdleBehavior]bool{
86 IdleBehaviorRun: true,
87 IdleBehaviorHold: true,
88 IdleBehaviorDrain: true,
92 logger logrus.FieldLogger
96 mtx sync.Locker // must be wp's Locker.
98 idleBehavior IdleBehavior
99 instance cloud.Instance
100 instType arvados.InstanceType
108 firstSSHConnection time.Time
110 running map[string]*remoteRunner // remember to update state idle<->running when this changes
111 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
112 probing chan struct{}
113 bootOutcomeReported bool
114 timeToReadyReported bool
115 staleRunLockSince time.Time
118 func (wkr *worker) onUnkillable(uuid string) {
120 defer wkr.mtx.Unlock()
121 logger := wkr.logger.WithField("ContainerUUID", uuid)
122 if wkr.idleBehavior == IdleBehaviorHold {
123 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
126 logger.Warn("unkillable container, draining worker")
127 wkr.setIdleBehavior(IdleBehaviorDrain)
130 func (wkr *worker) onKilled(uuid string) {
132 defer wkr.mtx.Unlock()
133 wkr.closeRunner(uuid)
137 // caller must have lock.
138 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
139 if wkr.bootOutcomeReported {
142 if wkr.wp.mBootOutcomes != nil {
143 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
145 wkr.bootOutcomeReported = true
148 // caller must have lock.
149 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
150 if wkr.timeToReadyReported {
153 if wkr.wp.mTimeToSSH != nil {
154 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
156 wkr.timeToReadyReported = true
159 // caller must have lock.
160 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
161 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
162 wkr.idleBehavior = idleBehavior
167 // caller must have lock.
168 func (wkr *worker) startContainer(ctr arvados.Container) {
169 logger := wkr.logger.WithFields(logrus.Fields{
170 "ContainerUUID": ctr.UUID,
171 "Priority": ctr.Priority,
173 logger.Debug("starting container")
174 rr := newRemoteRunner(ctr.UUID, wkr)
175 wkr.starting[ctr.UUID] = rr
176 if wkr.state != StateRunning {
177 wkr.state = StateRunning
182 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
183 wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
186 defer wkr.mtx.Unlock()
190 delete(wkr.starting, ctr.UUID)
191 wkr.running[ctr.UUID] = rr
192 wkr.lastUUID = ctr.UUID
196 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
197 // for the worker's current state. If a previous probe is still
198 // running, it does nothing.
200 // It should be called in a new goroutine.
201 func (wkr *worker) ProbeAndUpdate() {
203 case wkr.probing <- struct{}{}:
207 wkr.logger.Debug("still waiting for last probe to finish")
211 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
212 // updates state accordingly.
214 // In StateUnknown: Call both probeBooted and probeRunning.
215 // In StateBooting: Call probeBooted; if successful, call probeRunning.
216 // In StateRunning: Call probeRunning.
217 // In StateIdle: Call probeRunning.
218 // In StateShutdown: Do nothing.
220 // If both probes succeed, wkr.state changes to
221 // StateIdle/StateRunning.
223 // If probeRunning succeeds, wkr.running is updated. (This means
224 // wkr.running might be non-empty even in StateUnknown, if the boot
227 // probeAndUpdate should be called in a new goroutine.
228 func (wkr *worker) probeAndUpdate() {
230 updated := wkr.updated
231 initialState := wkr.state
238 stderr []byte // from probeBooted
241 switch initialState {
244 case StateIdle, StateRunning:
246 case StateUnknown, StateBooting:
248 panic(fmt.Sprintf("unknown state %s", initialState))
251 probeStart := time.Now()
252 logger := wkr.logger.WithField("ProbeStart", probeStart)
255 booted, stderr = wkr.probeBooted()
256 shouldCopy := booted || initialState == StateUnknown
258 // Pretend this probe succeeded if another
259 // concurrent attempt succeeded.
261 if wkr.state == StateRunning || wkr.state == StateIdle {
268 _, stderrCopy, err := wkr.copyRunnerData()
271 wkr.logger.WithError(err).WithField("stderr", string(stderrCopy)).Warn("error copying runner binary")
275 logger.Info("instance booted; will try probeRunning")
278 reportedBroken := false
279 if booted || initialState == StateUnknown {
280 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
283 defer wkr.mtx.Unlock()
284 if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
285 logger.Info("probe reported broken instance")
286 wkr.reportBootOutcome(BootOutcomeFailed)
287 wkr.setIdleBehavior(IdleBehaviorDrain)
289 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
290 if wkr.state == StateShutdown && wkr.updated.After(updated) {
291 // Skip the logging noise if shutdown was
292 // initiated during probe.
295 // Using the start time of the probe as the timeout
296 // threshold ensures we always initiate at least one
297 // probe attempt after the boot/probe timeout expires
298 // (otherwise, a slow probe failure could cause us to
299 // shutdown an instance even though it did in fact
300 // boot/recover before the timeout expired).
301 dur := probeStart.Sub(wkr.probed)
302 if wkr.shutdownIfBroken(dur) {
303 // stderr from failed run-probes will have
304 // been logged already, but boot-probe
305 // failures are normal so they are logged only
306 // at Debug level. This is our chance to log
307 // some evidence about why the node never
308 // booted, even in non-debug mode.
310 wkr.reportBootOutcome(BootOutcomeFailed)
311 logger.WithFields(logrus.Fields{
313 "stderr": string(stderr),
314 }).Info("boot failed")
320 updateTime := time.Now()
321 wkr.probed = updateTime
323 if updated != wkr.updated {
324 // Worker was updated after the probe began, so
325 // wkr.running might have a container UUID that was
326 // not yet running when ctrUUIDs was generated. Leave
327 // wkr.running alone and wait for the next probe to
328 // catch up on any changes.
329 logger.WithFields(logrus.Fields{
331 "wkr.updated": wkr.updated,
332 }).Debug("skipping worker state update due to probe/sync race")
336 if len(ctrUUIDs) > 0 {
337 wkr.busy = updateTime
338 wkr.lastUUID = ctrUUIDs[0]
339 } else if len(wkr.running) > 0 {
340 // Actual last-busy time was sometime between wkr.busy
341 // and now. Now is the earliest opportunity to take
342 // advantage of the non-busy state, though.
343 wkr.busy = updateTime
346 changed := wkr.updateRunning(ctrUUIDs)
348 // Update state if this was the first successful boot-probe.
349 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
350 if wkr.state == StateBooting {
351 wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
353 // Note: this will change again below if
354 // len(wkr.starting)+len(wkr.running) > 0.
355 wkr.state = StateIdle
359 // If wkr.state and wkr.running aren't changing then there's
360 // no need to log anything, notify the scheduler, move state
361 // back and forth between idle/running, etc.
366 // Log whenever a run-probe reveals crunch-run processes
367 // appearing/disappearing before boot-probe succeeds.
368 if wkr.state == StateUnknown && changed {
369 logger.WithFields(logrus.Fields{
370 "RunningContainers": len(wkr.running),
372 }).Info("crunch-run probe succeeded, but boot probe is still failing")
375 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
376 wkr.state = StateRunning
377 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
378 wkr.state = StateIdle
380 wkr.updated = updateTime
381 if booted && (initialState == StateUnknown || initialState == StateBooting) {
382 wkr.reportBootOutcome(BootOutcomeSucceeded)
383 logger.WithFields(logrus.Fields{
384 "RunningContainers": len(wkr.running),
386 }).Info("probes succeeded, instance is in service")
391 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
392 cmd := wkr.wp.runnerCmd + " --list"
393 if u := wkr.instance.RemoteUser(); u != "root" {
398 if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
399 j, _ := json.Marshal(prices)
400 stdin = bytes.NewReader(j)
402 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
404 wkr.logger.WithFields(logrus.Fields{
406 "stdout": string(stdout),
407 "stderr": string(stderr),
408 }).WithError(err).Warn("probe failed")
409 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
412 wkr.logger.WithFields(logrus.Fields{
414 "stdout": string(stdout),
415 "stderr": string(stderr),
416 }).Debug("probe succeeded")
417 wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
420 staleRunLock := false
421 for _, s := range strings.Split(string(stdout), "\n") {
422 // Each line of the "crunch-run --list" output is one
425 // * a container UUID, indicating that processes
426 // related to that container are currently running.
427 // Optionally followed by " stale", indicating that
428 // the crunch-run process itself has exited (the
429 // remaining process is probably arv-mount).
431 // * the string "broken", indicating that the instance
432 // appears incapable of starting containers.
434 // See ListProcesses() in lib/crunchrun/background.go.
436 // empty string following final newline
437 } else if s == "broken" {
439 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
440 // Ignore crunch-run processes that belong to
441 // a different cluster (e.g., a single host
442 // running multiple clusters with the loopback
445 } else if toks := strings.Split(s, " "); len(toks) == 1 {
446 running = append(running, s)
447 } else if toks[1] == "stale" {
448 wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
453 defer wkr.mtx.Unlock()
455 wkr.staleRunLockSince = time.Time{}
456 } else if wkr.staleRunLockSince.IsZero() {
457 wkr.staleRunLockSince = time.Now()
458 } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
459 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
465 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
466 cmd := wkr.wp.bootProbeCommand
470 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
471 logger := wkr.logger.WithFields(logrus.Fields{
473 "stdout": string(stdout),
474 "stderr": string(stderr),
477 logger.WithError(err).Debug("boot probe failed")
480 logger.Info("boot probe succeeded")
484 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
485 if err = wkr.wp.loadRunnerData(); err != nil {
486 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
488 } else if len(wkr.wp.runnerData) == 0 {
489 // Assume crunch-run is already installed
493 hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
494 dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
495 logger := wkr.logger.WithFields(logrus.Fields{
497 "path": wkr.wp.runnerCmd,
500 stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
501 if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
502 logger.Info("runner binary already exists on worker, with correct hash")
506 // Note touch+chmod come before writing data, to avoid the
507 // possibility of md5 being correct while file mode is
509 cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
510 if wkr.instance.RemoteUser() != "root" {
511 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
513 logger.WithField("cmd", cmd).Info("installing runner binary on worker")
514 stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
518 // caller must have lock.
519 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
520 if wkr.idleBehavior == IdleBehaviorHold {
524 label, threshold := "", wkr.wp.timeoutProbe
525 if wkr.state == StateUnknown || wkr.state == StateBooting {
526 label, threshold = "new ", wkr.wp.timeoutBooting
531 wkr.logger.WithFields(logrus.Fields{
535 }).Warnf("%sinstance unresponsive, shutting down", label)
540 // Returns true if the instance is eligible for shutdown: either it's
541 // been idle too long, or idleBehavior=Drain and nothing is running.
543 // caller must have lock.
544 func (wkr *worker) eligibleForShutdown() bool {
545 if wkr.idleBehavior == IdleBehaviorHold {
548 draining := wkr.idleBehavior == IdleBehaviorDrain
553 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
558 for _, rr := range wkr.running {
563 for _, rr := range wkr.starting {
568 // draining, and all remaining runners are just trying
569 // to force-kill their crunch-run procs
576 // caller must have lock.
577 func (wkr *worker) shutdownIfIdle() bool {
578 if !wkr.eligibleForShutdown() {
581 wkr.logger.WithFields(logrus.Fields{
583 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
584 "IdleBehavior": wkr.idleBehavior,
585 }).Info("shutdown worker")
586 wkr.reportBootOutcome(BootOutcomeAborted)
591 // caller must have lock.
592 func (wkr *worker) shutdown() {
596 wkr.state = StateShutdown
599 err := wkr.instance.Destroy()
601 wkr.logger.WithError(err).Warn("shutdown failed")
607 // Save worker tags to cloud provider metadata, if they don't already
608 // match. Caller must have lock.
609 func (wkr *worker) saveTags() {
610 instance := wkr.instance
611 tags := instance.Tags()
612 update := cloud.InstanceTags{
613 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
614 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
617 for k, v := range update {
625 err := instance.SetTags(tags)
627 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
633 func (wkr *worker) Close() {
634 // This might take time, so do it after unlocking mtx.
635 defer wkr.executor.Close()
638 defer wkr.mtx.Unlock()
639 for uuid, rr := range wkr.running {
640 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
643 for uuid, rr := range wkr.starting {
644 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
649 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
650 // probe. Returns true if anything was added or removed.
652 // Caller must have lock.
653 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
654 alive := map[string]bool{}
655 for _, uuid := range ctrUUIDs {
657 if _, ok := wkr.running[uuid]; ok {
659 } else if rr, ok := wkr.starting[uuid]; ok {
660 wkr.running[uuid] = rr
661 delete(wkr.starting, uuid)
664 // We didn't start it -- it must have been
665 // started by a previous dispatcher process.
666 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
667 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
671 for uuid := range wkr.running {
673 wkr.closeRunner(uuid)
680 // caller must have lock.
681 func (wkr *worker) closeRunner(uuid string) {
682 rr := wkr.running[uuid]
686 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
687 delete(wkr.running, uuid)
692 wkr.wp.exited[uuid] = now
693 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
694 wkr.state = StateIdle