1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.arvados.org/arvados.git/lib/cloud"
16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "git.arvados.org/arvados.git/sdk/go/stats"
18 "github.com/sirupsen/logrus"
23 maxPingFailTime = 10 * time.Minute
26 // State indicates whether a worker is available to do work, and (if
27 // not) whether/when it is expected to become ready.
31 StateUnknown State = iota // might be running a container already
32 StateBooting // instance is booting
33 StateIdle // instance booted, no containers are running
34 StateRunning // instance is running one or more containers
35 StateShutdown // worker has stopped monitoring the instance
38 var stateString = map[State]string{
39 StateUnknown: "unknown",
40 StateBooting: "booting",
42 StateRunning: "running",
43 StateShutdown: "shutdown",
46 // String implements fmt.Stringer.
47 func (s State) String() string {
51 // MarshalText implements encoding.TextMarshaler so a JSON encoding of
52 // map[State]anything uses the state's string representation.
53 func (s State) MarshalText() ([]byte, error) {
54 return []byte(stateString[s]), nil
57 // BootOutcome is the result of a worker boot. It is used as a label in a metric.
58 type BootOutcome string
61 BootOutcomeFailed BootOutcome = "failure"
62 BootOutcomeSucceeded BootOutcome = "success"
63 BootOutcomeAborted BootOutcome = "aborted"
64 BootOutcomeDisappeared BootOutcome = "disappeared"
67 var validBootOutcomes = map[BootOutcome]bool{
68 BootOutcomeFailed: true,
69 BootOutcomeSucceeded: true,
70 BootOutcomeAborted: true,
71 BootOutcomeDisappeared: true,
74 // IdleBehavior indicates the behavior desired when a node becomes idle.
75 type IdleBehavior string
78 IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
79 IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
80 IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
83 var validIdleBehavior = map[IdleBehavior]bool{
84 IdleBehaviorRun: true,
85 IdleBehaviorHold: true,
86 IdleBehaviorDrain: true,
90 logger logrus.FieldLogger
94 mtx sync.Locker // must be wp's Locker.
96 idleBehavior IdleBehavior
97 instance cloud.Instance
98 instType arvados.InstanceType
106 firstSSHConnection time.Time
108 running map[string]*remoteRunner // remember to update state idle<->running when this changes
109 starting map[string]*remoteRunner // remember to update state idle<->running when this changes
110 probing chan struct{}
111 bootOutcomeReported bool
112 timeToReadyReported bool
113 staleRunLockSince time.Time
116 func (wkr *worker) onUnkillable(uuid string) {
118 defer wkr.mtx.Unlock()
119 logger := wkr.logger.WithField("ContainerUUID", uuid)
120 if wkr.idleBehavior == IdleBehaviorHold {
121 logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
124 logger.Warn("unkillable container, draining worker")
125 wkr.setIdleBehavior(IdleBehaviorDrain)
128 func (wkr *worker) onKilled(uuid string) {
130 defer wkr.mtx.Unlock()
131 wkr.closeRunner(uuid)
135 // caller must have lock.
136 func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
137 if wkr.bootOutcomeReported {
140 if wkr.wp.mBootOutcomes != nil {
141 wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
143 wkr.bootOutcomeReported = true
146 // caller must have lock.
147 func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
148 if wkr.timeToReadyReported {
151 if wkr.wp.mTimeToSSH != nil {
152 wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
154 wkr.timeToReadyReported = true
157 // caller must have lock.
158 func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
159 wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
160 wkr.idleBehavior = idleBehavior
165 // caller must have lock.
166 func (wkr *worker) startContainer(ctr arvados.Container) {
167 logger := wkr.logger.WithFields(logrus.Fields{
168 "ContainerUUID": ctr.UUID,
169 "Priority": ctr.Priority,
171 logger.Debug("starting container")
172 rr := newRemoteRunner(ctr.UUID, wkr)
173 wkr.starting[ctr.UUID] = rr
174 if wkr.state != StateRunning {
175 wkr.state = StateRunning
180 if wkr.wp.mTimeFromQueueToCrunchRun != nil {
181 wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
184 defer wkr.mtx.Unlock()
188 delete(wkr.starting, ctr.UUID)
189 wkr.running[ctr.UUID] = rr
190 wkr.lastUUID = ctr.UUID
194 // ProbeAndUpdate conducts appropriate boot/running probes (if any)
195 // for the worker's current state. If a previous probe is still
196 // running, it does nothing.
198 // It should be called in a new goroutine.
199 func (wkr *worker) ProbeAndUpdate() {
201 case wkr.probing <- struct{}{}:
205 wkr.logger.Debug("still waiting for last probe to finish")
209 // probeAndUpdate calls probeBooted and/or probeRunning if needed, and
210 // updates state accordingly.
212 // In StateUnknown: Call both probeBooted and probeRunning.
213 // In StateBooting: Call probeBooted; if successful, call probeRunning.
214 // In StateRunning: Call probeRunning.
215 // In StateIdle: Call probeRunning.
216 // In StateShutdown: Do nothing.
218 // If both probes succeed, wkr.state changes to
219 // StateIdle/StateRunning.
221 // If probeRunning succeeds, wkr.running is updated. (This means
222 // wkr.running might be non-empty even in StateUnknown, if the boot
225 // probeAndUpdate should be called in a new goroutine.
226 func (wkr *worker) probeAndUpdate() {
228 updated := wkr.updated
229 initialState := wkr.state
236 stderr []byte // from probeBooted
239 switch initialState {
242 case StateIdle, StateRunning:
244 case StateUnknown, StateBooting:
246 panic(fmt.Sprintf("unknown state %s", initialState))
249 probeStart := time.Now()
250 logger := wkr.logger.WithField("ProbeStart", probeStart)
253 booted, stderr = wkr.probeBooted()
255 // Pretend this probe succeeded if another
256 // concurrent attempt succeeded.
258 booted = wkr.state == StateRunning || wkr.state == StateIdle
262 logger.Info("instance booted; will try probeRunning")
265 reportedBroken := false
266 if booted || wkr.state == StateUnknown {
267 ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
270 defer wkr.mtx.Unlock()
271 if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
272 logger.Info("probe reported broken instance")
273 wkr.reportBootOutcome(BootOutcomeFailed)
274 wkr.setIdleBehavior(IdleBehaviorDrain)
276 if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
277 if wkr.state == StateShutdown && wkr.updated.After(updated) {
278 // Skip the logging noise if shutdown was
279 // initiated during probe.
282 // Using the start time of the probe as the timeout
283 // threshold ensures we always initiate at least one
284 // probe attempt after the boot/probe timeout expires
285 // (otherwise, a slow probe failure could cause us to
286 // shutdown an instance even though it did in fact
287 // boot/recover before the timeout expired).
288 dur := probeStart.Sub(wkr.probed)
289 if wkr.shutdownIfBroken(dur) {
290 // stderr from failed run-probes will have
291 // been logged already, but boot-probe
292 // failures are normal so they are logged only
293 // at Debug level. This is our chance to log
294 // some evidence about why the node never
295 // booted, even in non-debug mode.
297 wkr.reportBootOutcome(BootOutcomeFailed)
298 logger.WithFields(logrus.Fields{
300 "stderr": string(stderr),
301 }).Info("boot failed")
307 updateTime := time.Now()
308 wkr.probed = updateTime
310 if updated != wkr.updated {
311 // Worker was updated after the probe began, so
312 // wkr.running might have a container UUID that was
313 // not yet running when ctrUUIDs was generated. Leave
314 // wkr.running alone and wait for the next probe to
315 // catch up on any changes.
316 logger.WithFields(logrus.Fields{
318 "wkr.updated": wkr.updated,
319 }).Debug("skipping worker state update due to probe/sync race")
323 if len(ctrUUIDs) > 0 {
324 wkr.busy = updateTime
325 wkr.lastUUID = ctrUUIDs[0]
326 } else if len(wkr.running) > 0 {
327 // Actual last-busy time was sometime between wkr.busy
328 // and now. Now is the earliest opportunity to take
329 // advantage of the non-busy state, though.
330 wkr.busy = updateTime
333 changed := wkr.updateRunning(ctrUUIDs)
335 // Update state if this was the first successful boot-probe.
336 if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
337 if wkr.state == StateBooting {
338 wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
340 // Note: this will change again below if
341 // len(wkr.starting)+len(wkr.running) > 0.
342 wkr.state = StateIdle
346 // If wkr.state and wkr.running aren't changing then there's
347 // no need to log anything, notify the scheduler, move state
348 // back and forth between idle/running, etc.
353 // Log whenever a run-probe reveals crunch-run processes
354 // appearing/disappearing before boot-probe succeeds.
355 if wkr.state == StateUnknown && changed {
356 logger.WithFields(logrus.Fields{
357 "RunningContainers": len(wkr.running),
359 }).Info("crunch-run probe succeeded, but boot probe is still failing")
362 if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
363 wkr.state = StateRunning
364 } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
365 wkr.state = StateIdle
367 wkr.updated = updateTime
368 if booted && (initialState == StateUnknown || initialState == StateBooting) {
369 wkr.reportBootOutcome(BootOutcomeSucceeded)
370 logger.WithFields(logrus.Fields{
371 "RunningContainers": len(wkr.running),
373 }).Info("probes succeeded, instance is in service")
378 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
379 cmd := wkr.wp.runnerCmd + " --list"
380 if u := wkr.instance.RemoteUser(); u != "root" {
384 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
386 wkr.logger.WithFields(logrus.Fields{
388 "stdout": string(stdout),
389 "stderr": string(stderr),
390 }).WithError(err).Warn("probe failed")
391 wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
394 wkr.logger.WithFields(logrus.Fields{
396 "stdout": string(stdout),
397 "stderr": string(stderr),
398 }).Debug("probe succeeded")
399 wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
402 staleRunLock := false
403 for _, s := range strings.Split(string(stdout), "\n") {
404 // Each line of the "crunch-run --list" output is one
407 // * a container UUID, indicating that processes
408 // related to that container are currently running.
409 // Optionally followed by " stale", indicating that
410 // the crunch-run process itself has exited (the
411 // remaining process is probably arv-mount).
413 // * the string "broken", indicating that the instance
414 // appears incapable of starting containers.
416 // See ListProcesses() in lib/crunchrun/background.go.
418 // empty string following final newline
419 } else if s == "broken" {
421 } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
422 // Ignore crunch-run processes that belong to
423 // a different cluster (e.g., a single host
424 // running multiple clusters with the loopback
427 } else if toks := strings.Split(s, " "); len(toks) == 1 {
428 running = append(running, s)
429 } else if toks[1] == "stale" {
430 wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
435 defer wkr.mtx.Unlock()
437 wkr.staleRunLockSince = time.Time{}
438 } else if wkr.staleRunLockSince.IsZero() {
439 wkr.staleRunLockSince = time.Now()
440 } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
441 wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
447 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
448 cmd := wkr.wp.bootProbeCommand
452 stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
453 logger := wkr.logger.WithFields(logrus.Fields{
455 "stdout": string(stdout),
456 "stderr": string(stderr),
459 logger.WithError(err).Debug("boot probe failed")
462 logger.Info("boot probe succeeded")
463 if err = wkr.wp.loadRunnerData(); err != nil {
464 wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
466 } else if len(wkr.wp.runnerData) == 0 {
467 // Assume crunch-run is already installed
468 } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
469 wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
470 return false, stderr2
472 stderr = append(stderr, stderr2...)
477 func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
478 hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
479 dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
480 logger := wkr.logger.WithFields(logrus.Fields{
482 "path": wkr.wp.runnerCmd,
485 stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
486 if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
487 logger.Info("runner binary already exists on worker, with correct hash")
491 // Note touch+chmod come before writing data, to avoid the
492 // possibility of md5 being correct while file mode is
494 cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
495 if wkr.instance.RemoteUser() != "root" {
496 cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
498 logger.WithField("cmd", cmd).Info("installing runner binary on worker")
499 stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
503 // caller must have lock.
504 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
505 if wkr.idleBehavior == IdleBehaviorHold {
509 label, threshold := "", wkr.wp.timeoutProbe
510 if wkr.state == StateUnknown || wkr.state == StateBooting {
511 label, threshold = "new ", wkr.wp.timeoutBooting
516 wkr.logger.WithFields(logrus.Fields{
520 }).Warnf("%sinstance unresponsive, shutting down", label)
525 // Returns true if the instance is eligible for shutdown: either it's
526 // been idle too long, or idleBehavior=Drain and nothing is running.
528 // caller must have lock.
529 func (wkr *worker) eligibleForShutdown() bool {
530 if wkr.idleBehavior == IdleBehaviorHold {
533 draining := wkr.idleBehavior == IdleBehaviorDrain
538 return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
543 for _, rr := range wkr.running {
548 for _, rr := range wkr.starting {
553 // draining, and all remaining runners are just trying
554 // to force-kill their crunch-run procs
561 // caller must have lock.
562 func (wkr *worker) shutdownIfIdle() bool {
563 if !wkr.eligibleForShutdown() {
566 wkr.logger.WithFields(logrus.Fields{
568 "IdleDuration": stats.Duration(time.Since(wkr.busy)),
569 "IdleBehavior": wkr.idleBehavior,
570 }).Info("shutdown worker")
571 wkr.reportBootOutcome(BootOutcomeAborted)
576 // caller must have lock.
577 func (wkr *worker) shutdown() {
581 wkr.state = StateShutdown
584 err := wkr.instance.Destroy()
586 wkr.logger.WithError(err).Warn("shutdown failed")
592 // Save worker tags to cloud provider metadata, if they don't already
593 // match. Caller must have lock.
594 func (wkr *worker) saveTags() {
595 instance := wkr.instance
596 tags := instance.Tags()
597 update := cloud.InstanceTags{
598 wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
599 wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
602 for k, v := range update {
610 err := instance.SetTags(tags)
612 wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
618 func (wkr *worker) Close() {
619 // This might take time, so do it after unlocking mtx.
620 defer wkr.executor.Close()
623 defer wkr.mtx.Unlock()
624 for uuid, rr := range wkr.running {
625 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
628 for uuid, rr := range wkr.starting {
629 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
634 // Add/remove entries in wkr.running to match ctrUUIDs returned by a
635 // probe. Returns true if anything was added or removed.
637 // Caller must have lock.
638 func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
639 alive := map[string]bool{}
640 for _, uuid := range ctrUUIDs {
642 if _, ok := wkr.running[uuid]; ok {
644 } else if rr, ok := wkr.starting[uuid]; ok {
645 wkr.running[uuid] = rr
646 delete(wkr.starting, uuid)
649 // We didn't start it -- it must have been
650 // started by a previous dispatcher process.
651 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
652 wkr.running[uuid] = newRemoteRunner(uuid, wkr)
656 for uuid := range wkr.running {
658 wkr.closeRunner(uuid)
665 // caller must have lock.
666 func (wkr *worker) closeRunner(uuid string) {
667 rr := wkr.running[uuid]
671 wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
672 delete(wkr.running, uuid)
677 wkr.wp.exited[uuid] = now
678 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
679 wkr.state = StateIdle