1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/lib/cloud"
18 "git.arvados.org/arvados.git/sdk/go/arvados"
19 "git.arvados.org/arvados.git/sdk/go/stats"
20 "github.com/sirupsen/logrus"
25 maxPingFailTime = 10 * time.Minute
28 // State indicates whether a worker is available to do work, and (if
29 // not) whether/when it is expected to become ready.
33 StateUnknown State = iota // might be running a container already
34 StateBooting // instance is booting
35 StateIdle // instance booted, no containers are running
36 StateRunning // instance is running one or more containers
37 StateShutdown // worker has stopped monitoring the instance
40 var stateString = map[State]string{
41 StateUnknown: "unknown",
42 StateBooting: "booting",
44 StateRunning: "running",
45 StateShutdown: "shutdown",
48 // String implements fmt.Stringer.
49 func (s State) String() string {
53 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
54 // map[State]anything uses the state's string representation.
55 func (s State) MarshalText() ([]byte, error) {
56 return []byte(stateString[s]), nil
59 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
60 type BootOutcome string
63 BootOutcomeFailed BootOutcome = "failure"
64 BootOutcomeSucceeded BootOutcome = "success"
65 BootOutcomeAborted BootOutcome = "aborted"
66 BootOutcomeDisappeared BootOutcome = "disappeared"
69 var validBootOutcomes = map[BootOutcome]bool{
70 BootOutcomeFailed: true,
71 BootOutcomeSucceeded: true,
72 BootOutcomeAborted: true,
73 BootOutcomeDisappeared: true,
76 // IdleBehavior indicates the behavior desired when a node becomes idle.
77 type IdleBehavior string
80 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
81 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
82 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
85 var validIdleBehavior = map[IdleBehavior]bool{
86 IdleBehaviorRun: true,
87 IdleBehaviorHold: true,
88 IdleBehaviorDrain: true,
92 logger logrus.FieldLogger
96 mtx sync.Locker // must be wp's Locker.
98 idleBehavior IdleBehavior
99 instance cloud.Instance
100 instType arvados.InstanceType
108 firstSSHConnection time.Time
110 running map[string]*remoteRunner // remember to update state idle<->running when this changes
111 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
112 probing chan struct{}
113 bootOutcomeReported bool
114 timeToReadyReported bool
115 staleRunLockSince time.Time
118 func (wkr *worker) onUnkillable(uuid string) {
120 defer wkr.mtx.Unlock()
121 logger := wkr.logger.WithField("ContainerUUID", uuid)
122 if wkr.idleBehavior == IdleBehaviorHold {
123 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
126 logger.Warn("unkillable container, draining worker")
127 wkr.setIdleBehavior(IdleBehaviorDrain)
130 func (wkr *worker) onKilled(uuid string) {
132 defer wkr.mtx.Unlock()
133 wkr.closeRunner(uuid)
137 // caller must have lock.
138 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
139 if wkr.bootOutcomeReported {
142 if wkr.wp.mBootOutcomes != nil {
143 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
145 wkr.bootOutcomeReported = true
148 // caller must have lock.
149 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
150 if wkr.timeToReadyReported {
153 if wkr.wp.mTimeToSSH != nil {
154 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
156 wkr.timeToReadyReported = true
159 // caller must have lock.
160 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
161 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
162 wkr.idleBehavior = idleBehavior
167 // caller must have lock.
168 func (wkr *worker) startContainer(ctr arvados.Container) {
169 logger := wkr.logger.WithFields(logrus.Fields{
170 "ContainerUUID": ctr.UUID,
171 "Priority": ctr.Priority,
173 logger.Debug("starting container")
174 rr := newRemoteRunner(ctr.UUID, wkr)
175 wkr.starting[ctr.UUID] = rr
176 if wkr.state != StateRunning {
177 wkr.state = StateRunning
182 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
183 wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
186 defer wkr.mtx.Unlock()
190 delete(wkr.starting, ctr.UUID)
191 wkr.running[ctr.UUID] = rr
192 wkr.lastUUID = ctr.UUID
196 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
197 // for the worker's current state. If a previous probe is still
198 // running, it does nothing.
200 // It should be called in a new goroutine.
201 func (wkr *worker) ProbeAndUpdate() {
203 case wkr.probing <- struct{}{}:
207 wkr.logger.Debug("still waiting for last probe to finish")
211 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
212 // updates state accordingly.
214 // In StateUnknown: Call both probeBooted and probeRunning.
215 // In StateBooting: Call probeBooted; if successful, call probeRunning.
216 // In StateRunning: Call probeRunning.
217 // In StateIdle: Call probeRunning.
218 // In StateShutdown: Do nothing.
220 // If both probes succeed, wkr.state changes to
221 // StateIdle/StateRunning.
223 // If probeRunning succeeds, wkr.running is updated. (This means
224 // wkr.running might be non-empty even in StateUnknown, if the boot
227 // probeAndUpdate should be called in a new goroutine.
228 func (wkr *worker) probeAndUpdate() {
230 updated := wkr.updated
231 initialState := wkr.state
238 stderr []byte // from probeBooted
241 switch initialState {
244 case StateIdle, StateRunning:
246 case StateUnknown, StateBooting:
248 panic(fmt.Sprintf("unknown state %s", initialState))
251 probeStart := time.Now()
252 logger := wkr.logger.WithField("ProbeStart", probeStart)
255 booted, stderr = wkr.probeBooted()
257 // Pretend this probe succeeded if another
258 // concurrent attempt succeeded.
260 booted = wkr.state == StateRunning || wkr.state == StateIdle
264 logger.Info("instance booted; will try probeRunning")
267 reportedBroken := false
268 if booted || wkr.state == StateUnknown {
269 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
272 defer wkr.mtx.Unlock()
273 if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
274 logger.Info("probe reported broken instance")
275 wkr.reportBootOutcome(BootOutcomeFailed)
276 wkr.setIdleBehavior(IdleBehaviorDrain)
278 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
279 if wkr.state == StateShutdown && wkr.updated.After(updated) {
280 // Skip the logging noise if shutdown was
281 // initiated during probe.
284 // Using the start time of the probe as the timeout
285 // threshold ensures we always initiate at least one
286 // probe attempt after the boot/probe timeout expires
287 // (otherwise, a slow probe failure could cause us to
288 // shutdown an instance even though it did in fact
289 // boot/recover before the timeout expired).
290 dur := probeStart.Sub(wkr.probed)
291 if wkr.shutdownIfBroken(dur) {
292 // stderr from failed run-probes will have
293 // been logged already, but boot-probe
294 // failures are normal so they are logged only
295 // at Debug level. This is our chance to log
296 // some evidence about why the node never
297 // booted, even in non-debug mode.
299 wkr.reportBootOutcome(BootOutcomeFailed)
300 logger.WithFields(logrus.Fields{
302 "stderr": string(stderr),
303 }).Info("boot failed")
309 updateTime := time.Now()
310 wkr.probed = updateTime
312 if updated != wkr.updated {
313 // Worker was updated after the probe began, so
314 // wkr.running might have a container UUID that was
315 // not yet running when ctrUUIDs was generated. Leave
316 // wkr.running alone and wait for the next probe to
317 // catch up on any changes.
318 logger.WithFields(logrus.Fields{
320 "wkr.updated": wkr.updated,
321 }).Debug("skipping worker state update due to probe/sync race")
325 if len(ctrUUIDs) > 0 {
326 wkr.busy = updateTime
327 wkr.lastUUID = ctrUUIDs[0]
328 } else if len(wkr.running) > 0 {
329 // Actual last-busy time was sometime between wkr.busy
330 // and now. Now is the earliest opportunity to take
331 // advantage of the non-busy state, though.
332 wkr.busy = updateTime
335 changed := wkr.updateRunning(ctrUUIDs)
337 // Update state if this was the first successful boot-probe.
338 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
339 if wkr.state == StateBooting {
340 wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
342 // Note: this will change again below if
343 // len(wkr.starting)+len(wkr.running) > 0.
344 wkr.state = StateIdle
348 // If wkr.state and wkr.running aren't changing then there's
349 // no need to log anything, notify the scheduler, move state
350 // back and forth between idle/running, etc.
355 // Log whenever a run-probe reveals crunch-run processes
356 // appearing/disappearing before boot-probe succeeds.
357 if wkr.state == StateUnknown && changed {
358 logger.WithFields(logrus.Fields{
359 "RunningContainers": len(wkr.running),
361 }).Info("crunch-run probe succeeded, but boot probe is still failing")
364 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
365 wkr.state = StateRunning
366 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
367 wkr.state = StateIdle
369 wkr.updated = updateTime
370 if booted && (initialState == StateUnknown || initialState == StateBooting) {
371 wkr.reportBootOutcome(BootOutcomeSucceeded)
372 logger.WithFields(logrus.Fields{
373 "RunningContainers": len(wkr.running),
375 }).Info("probes succeeded, instance is in service")
380 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
381 cmd := wkr.wp.runnerCmd + " --list"
382 if u := wkr.instance.RemoteUser(); u != "root" {
387 if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
388 j, _ := json.Marshal(prices)
389 stdin = bytes.NewReader(j)
391 stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
393 wkr.logger.WithFields(logrus.Fields{
395 "stdout": string(stdout),
396 "stderr": string(stderr),
397 }).WithError(err).Warn("probe failed")
398 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
401 wkr.logger.WithFields(logrus.Fields{
403 "stdout": string(stdout),
404 "stderr": string(stderr),
405 }).Debug("probe succeeded")
406 wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
409 staleRunLock := false
410 for _, s := range strings.Split(string(stdout), "\n") {
411 // Each line of the "crunch-run --list" output is one
414 // * a container UUID, indicating that processes
415 // related to that container are currently running.
416 // Optionally followed by " stale", indicating that
417 // the crunch-run process itself has exited (the
418 // remaining process is probably arv-mount).
420 // * the string "broken", indicating that the instance
421 // appears incapable of starting containers.
423 // See ListProcesses() in lib/crunchrun/background.go.
425 // empty string following final newline
426 } else if s == "broken" {
428 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
429 // Ignore crunch-run processes that belong to
430 // a different cluster (e.g., a single host
431 // running multiple clusters with the loopback
434 } else if toks := strings.Split(s, " "); len(toks) == 1 {
435 running = append(running, s)
436 } else if toks[1] == "stale" {
437 wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
442 defer wkr.mtx.Unlock()
444 wkr.staleRunLockSince = time.Time{}
445 } else if wkr.staleRunLockSince.IsZero() {
446 wkr.staleRunLockSince = time.Now()
447 } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
448 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
454 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
455 cmd := wkr.wp.bootProbeCommand
459 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
460 logger := wkr.logger.WithFields(logrus.Fields{
462 "stdout": string(stdout),
463 "stderr": string(stderr),
466 logger.WithError(err).Debug("boot probe failed")
469 logger.Info("boot probe succeeded")
470 if err = wkr.wp.loadRunnerData(); err != nil {
471 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
473 } else if len(wkr.wp.runnerData) == 0 {
474 // Assume crunch-run is already installed
475 } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
476 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
477 return false, stderr2
479 stderr = append(stderr, stderr2...)
484 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
485 hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
486 dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
487 logger := wkr.logger.WithFields(logrus.Fields{
489 "path": wkr.wp.runnerCmd,
492 stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
493 if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
494 logger.Info("runner binary already exists on worker, with correct hash")
498 // Note touch+chmod come before writing data, to avoid the
499 // possibility of md5 being correct while file mode is
501 cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
502 if wkr.instance.RemoteUser() != "root" {
503 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
505 logger.WithField("cmd", cmd).Info("installing runner binary on worker")
506 stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
510 // caller must have lock.
511 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
512 if wkr.idleBehavior == IdleBehaviorHold {
516 label, threshold := "", wkr.wp.timeoutProbe
517 if wkr.state == StateUnknown || wkr.state == StateBooting {
518 label, threshold = "new ", wkr.wp.timeoutBooting
523 wkr.logger.WithFields(logrus.Fields{
527 }).Warnf("%sinstance unresponsive, shutting down", label)
532 // Returns true if the instance is eligible for shutdown: either it's
533 // been idle too long, or idleBehavior=Drain and nothing is running.
535 // caller must have lock.
536 func (wkr *worker) eligibleForShutdown() bool {
537 if wkr.idleBehavior == IdleBehaviorHold {
540 draining := wkr.idleBehavior == IdleBehaviorDrain
545 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
550 for _, rr := range wkr.running {
555 for _, rr := range wkr.starting {
560 // draining, and all remaining runners are just trying
561 // to force-kill their crunch-run procs
568 // caller must have lock.
569 func (wkr *worker) shutdownIfIdle() bool {
570 if !wkr.eligibleForShutdown() {
573 wkr.logger.WithFields(logrus.Fields{
575 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
576 "IdleBehavior": wkr.idleBehavior,
577 }).Info("shutdown worker")
578 wkr.reportBootOutcome(BootOutcomeAborted)
583 // caller must have lock.
584 func (wkr *worker) shutdown() {
588 wkr.state = StateShutdown
591 err := wkr.instance.Destroy()
593 wkr.logger.WithError(err).Warn("shutdown failed")
599 // Save worker tags to cloud provider metadata, if they don't already
600 // match. Caller must have lock.
601 func (wkr *worker) saveTags() {
602 instance := wkr.instance
603 tags := instance.Tags()
604 update := cloud.InstanceTags{
605 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
606 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
609 for k, v := range update {
617 err := instance.SetTags(tags)
619 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
625 func (wkr *worker) Close() {
626 // This might take time, so do it after unlocking mtx.
627 defer wkr.executor.Close()
630 defer wkr.mtx.Unlock()
631 for uuid, rr := range wkr.running {
632 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
635 for uuid, rr := range wkr.starting {
636 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
641 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
642 // probe. Returns true if anything was added or removed.
644 // Caller must have lock.
645 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
646 alive := map[string]bool{}
647 for _, uuid := range ctrUUIDs {
649 if _, ok := wkr.running[uuid]; ok {
651 } else if rr, ok := wkr.starting[uuid]; ok {
652 wkr.running[uuid] = rr
653 delete(wkr.starting, uuid)
656 // We didn't start it -- it must have been
657 // started by a previous dispatcher process.
658 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
659 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
663 for uuid := range wkr.running {
665 wkr.closeRunner(uuid)
672 // caller must have lock.
673 func (wkr *worker) closeRunner(uuid string) {
674 rr := wkr.running[uuid]
678 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
679 delete(wkr.running, uuid)
684 wkr.wp.exited[uuid] = now
685 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
686 wkr.state = StateIdle