// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package worker import ( "bytes" "fmt" "path/filepath" "strings" "sync" "time" "git.arvados.org/arvados.git/lib/cloud" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" ) const ( // TODO: configurable maxPingFailTime = 10 * time.Minute ) // State indicates whether a worker is available to do work, and (if // not) whether/when it is expected to become ready. type State int const ( StateUnknown State = iota // might be running a container already StateBooting // instance is booting StateIdle // instance booted, no containers are running StateRunning // instance is running one or more containers StateShutdown // worker has stopped monitoring the instance ) var stateString = map[State]string{ StateUnknown: "unknown", StateBooting: "booting", StateIdle: "idle", StateRunning: "running", StateShutdown: "shutdown", } // String implements fmt.Stringer. func (s State) String() string { return stateString[s] } // MarshalText implements encoding.TextMarshaler so a JSON encoding of // map[State]anything uses the state's string representation. func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } // BootOutcome is the result of a worker boot. It is used as a label in a metric. type BootOutcome string const ( BootOutcomeFailed BootOutcome = "failure" BootOutcomeSucceeded BootOutcome = "success" BootOutcomeAborted BootOutcome = "aborted" BootOutcomeDisappeared BootOutcome = "disappeared" ) var validBootOutcomes = map[BootOutcome]bool{ BootOutcomeFailed: true, BootOutcomeSucceeded: true, BootOutcomeAborted: true, BootOutcomeDisappeared: true, } // IdleBehavior indicates the behavior desired when a node becomes idle. type IdleBehavior string const ( IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle ) var validIdleBehavior = map[IdleBehavior]bool{ IdleBehaviorRun: true, IdleBehaviorHold: true, IdleBehaviorDrain: true, } type worker struct { logger logrus.FieldLogger executor Executor wp *Pool mtx sync.Locker // must be wp's Locker. state State idleBehavior IdleBehavior instance cloud.Instance instType arvados.InstanceType vcpus int64 memory int64 appeared time.Time probed time.Time updated time.Time busy time.Time destroyed time.Time firstSSHConnection time.Time lastUUID string running map[string]*remoteRunner // remember to update state idle<->running when this changes starting map[string]*remoteRunner // remember to update state idle<->running when this changes probing chan struct{} bootOutcomeReported bool timeToReadyReported bool staleRunLockSince time.Time } func (wkr *worker) onUnkillable(uuid string) { wkr.mtx.Lock() defer wkr.mtx.Unlock() logger := wkr.logger.WithField("ContainerUUID", uuid) if wkr.idleBehavior == IdleBehaviorHold { logger.Warn("unkillable container, but worker has IdleBehavior=Hold") return } logger.Warn("unkillable container, draining worker") wkr.setIdleBehavior(IdleBehaviorDrain) } func (wkr *worker) onKilled(uuid string) { wkr.mtx.Lock() defer wkr.mtx.Unlock() wkr.closeRunner(uuid) go wkr.wp.notify() } // caller must have lock. func (wkr *worker) reportBootOutcome(outcome BootOutcome) { if wkr.bootOutcomeReported { return } if wkr.wp.mBootOutcomes != nil { wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc() } wkr.bootOutcomeReported = true } // caller must have lock. func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() { if wkr.timeToReadyReported { return } if wkr.wp.mTimeToSSH != nil { wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds()) } wkr.timeToReadyReported = true } // caller must have lock. func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) { wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior") wkr.idleBehavior = idleBehavior wkr.saveTags() wkr.shutdownIfIdle() } // caller must have lock. func (wkr *worker) startContainer(ctr arvados.Container) { logger := wkr.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "Priority": ctr.Priority, }) logger.Debug("starting container") rr := newRemoteRunner(ctr.UUID, wkr) wkr.starting[ctr.UUID] = rr if wkr.state != StateRunning { wkr.state = StateRunning go wkr.wp.notify() } go func() { rr.Start() if wkr.wp.mTimeFromQueueToCrunchRun != nil { wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds()) } wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() wkr.updated = now wkr.busy = now delete(wkr.starting, ctr.UUID) wkr.running[ctr.UUID] = rr wkr.lastUUID = ctr.UUID }() } // ProbeAndUpdate conducts appropriate boot/running probes (if any) // for the worker's current state. If a previous probe is still // running, it does nothing. // // It should be called in a new goroutine. func (wkr *worker) ProbeAndUpdate() { select { case wkr.probing <- struct{}{}: wkr.probeAndUpdate() <-wkr.probing default: wkr.logger.Debug("still waiting for last probe to finish") } } // probeAndUpdate calls probeBooted and/or probeRunning if needed, and // updates state accordingly. // // In StateUnknown: Call both probeBooted and probeRunning. // In StateBooting: Call probeBooted; if successful, call probeRunning. // In StateRunning: Call probeRunning. // In StateIdle: Call probeRunning. // In StateShutdown: Do nothing. // // If both probes succeed, wkr.state changes to // StateIdle/StateRunning. // // If probeRunning succeeds, wkr.running is updated. (This means // wkr.running might be non-empty even in StateUnknown, if the boot // probe failed.) // // probeAndUpdate should be called in a new goroutine. func (wkr *worker) probeAndUpdate() { wkr.mtx.Lock() updated := wkr.updated initialState := wkr.state wkr.mtx.Unlock() var ( booted bool ctrUUIDs []string ok bool stderr []byte // from probeBooted ) switch initialState { case StateShutdown: return case StateIdle, StateRunning: booted = true case StateUnknown, StateBooting: default: panic(fmt.Sprintf("unknown state %s", initialState)) } probeStart := time.Now() logger := wkr.logger.WithField("ProbeStart", probeStart) if !booted { booted, stderr = wkr.probeBooted() if !booted { // Pretend this probe succeeded if another // concurrent attempt succeeded. wkr.mtx.Lock() booted = wkr.state == StateRunning || wkr.state == StateIdle wkr.mtx.Unlock() } if booted { logger.Info("instance booted; will try probeRunning") } } reportedBroken := false if booted || wkr.state == StateUnknown { ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } wkr.mtx.Lock() defer wkr.mtx.Unlock() if reportedBroken && wkr.idleBehavior == IdleBehaviorRun { logger.Info("probe reported broken instance") wkr.reportBootOutcome(BootOutcomeFailed) wkr.setIdleBehavior(IdleBehaviorDrain) } if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { if wkr.state == StateShutdown && wkr.updated.After(updated) { // Skip the logging noise if shutdown was // initiated during probe. return } // Using the start time of the probe as the timeout // threshold ensures we always initiate at least one // probe attempt after the boot/probe timeout expires // (otherwise, a slow probe failure could cause us to // shutdown an instance even though it did in fact // boot/recover before the timeout expired). dur := probeStart.Sub(wkr.probed) if wkr.shutdownIfBroken(dur) { // stderr from failed run-probes will have // been logged already, but boot-probe // failures are normal so they are logged only // at Debug level. This is our chance to log // some evidence about why the node never // booted, even in non-debug mode. if !booted { wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), }).Info("boot failed") } } return } updateTime := time.Now() wkr.probed = updateTime if updated != wkr.updated { // Worker was updated after the probe began, so // wkr.running might have a container UUID that was // not yet running when ctrUUIDs was generated. Leave // wkr.running alone and wait for the next probe to // catch up on any changes. return } if len(ctrUUIDs) > 0 { wkr.busy = updateTime wkr.lastUUID = ctrUUIDs[0] } else if len(wkr.running) > 0 { // Actual last-busy time was sometime between wkr.busy // and now. Now is the earliest opportunity to take // advantage of the non-busy state, though. wkr.busy = updateTime } changed := wkr.updateRunning(ctrUUIDs) // Update state if this was the first successful boot-probe. if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { if wkr.state == StateBooting { wkr.reportTimeBetweenFirstSSHAndReadyForContainer() } // Note: this will change again below if // len(wkr.starting)+len(wkr.running) > 0. wkr.state = StateIdle changed = true } // If wkr.state and wkr.running aren't changing then there's // no need to log anything, notify the scheduler, move state // back and forth between idle/running, etc. if !changed { return } // Log whenever a run-probe reveals crunch-run processes // appearing/disappearing before boot-probe succeeds. if wkr.state == StateUnknown && changed { logger.WithFields(logrus.Fields{ "RunningContainers": len(wkr.running), "State": wkr.state, }).Info("crunch-run probe succeeded, but boot probe is still failing") } if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { wkr.state = StateRunning } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { wkr.state = StateIdle } wkr.updated = updateTime if booted && (initialState == StateUnknown || initialState == StateBooting) { wkr.reportBootOutcome(BootOutcomeSucceeded) logger.WithFields(logrus.Fields{ "RunningContainers": len(wkr.running), "State": wkr.state, }).Info("probes succeeded, instance is in service") } go wkr.wp.notify() } func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { cmd := wkr.wp.runnerCmd + " --list" if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } before := time.Now() stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) if err != nil { wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds()) return } wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds()) ok = true staleRunLock := false for _, s := range strings.Split(string(stdout), "\n") { // Each line of the "crunch-run --list" output is one // of the following: // // * a container UUID, indicating that processes // related to that container are currently running. // Optionally followed by " stale", indicating that // the crunch-run process itself has exited (the // remaining process is probably arv-mount). // // * the string "broken", indicating that the instance // appears incapable of starting containers. // // See ListProcesses() in lib/crunchrun/background.go. if s == "" { // empty string following final newline } else if s == "broken" { reportsBroken = true } else if toks := strings.Split(s, " "); len(toks) == 1 { running = append(running, s) } else if toks[1] == "stale" { wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock") staleRunLock = true } } wkr.mtx.Lock() defer wkr.mtx.Unlock() if !staleRunLock { wkr.staleRunLockSince = time.Time{} } else if wkr.staleRunLockSince.IsZero() { wkr.staleRunLockSince = time.Now() } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock { wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long") reportsBroken = true } return } func (wkr *worker) probeBooted() (ok bool, stderr []byte) { cmd := wkr.wp.bootProbeCommand if cmd == "" { cmd = "true" } stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) logger := wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), "stderr": string(stderr), }) if err != nil { logger.WithError(err).Debug("boot probe failed") return false, stderr } logger.Info("boot probe succeeded") if err = wkr.wp.loadRunnerData(); err != nil { wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") return false, stderr } else if len(wkr.wp.runnerData) == 0 { // Assume crunch-run is already installed } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") return false, stderr2 } else { stderr = append(stderr, stderr2...) } return true, stderr } func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) dstdir, _ := filepath.Split(wkr.wp.runnerCmd) logger := wkr.logger.WithFields(logrus.Fields{ "hash": hash, "path": wkr.wp.runnerCmd, }) stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil) if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) { logger.Info("runner binary already exists on worker, with correct hash") return } // Note touch+chmod come before writing data, to avoid the // possibility of md5 being correct while file mode is // incorrect. cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"` if wkr.instance.RemoteUser() != "root" { cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'` } logger.WithField("cmd", cmd).Info("installing runner binary on worker") stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData)) return } // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { if wkr.idleBehavior == IdleBehaviorHold { // Never shut down. return false } label, threshold := "", wkr.wp.timeoutProbe if wkr.state == StateUnknown || wkr.state == StateBooting { label, threshold = "new ", wkr.wp.timeoutBooting } if dur < threshold { return false } wkr.logger.WithFields(logrus.Fields{ "Duration": dur, "Since": wkr.probed, "State": wkr.state, }).Warnf("%sinstance unresponsive, shutting down", label) wkr.shutdown() return true } // Returns true if the instance is eligible for shutdown: either it's // been idle too long, or idleBehavior=Drain and nothing is running. // // caller must have lock. func (wkr *worker) eligibleForShutdown() bool { if wkr.idleBehavior == IdleBehaviorHold { return false } draining := wkr.idleBehavior == IdleBehaviorDrain switch wkr.state { case StateBooting: return draining case StateIdle: return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle case StateRunning: if !draining { return false } for _, rr := range wkr.running { if !rr.givenup { return false } } for _, rr := range wkr.starting { if !rr.givenup { return false } } // draining, and all remaining runners are just trying // to force-kill their crunch-run procs return true default: return false } } // caller must have lock. func (wkr *worker) shutdownIfIdle() bool { if !wkr.eligibleForShutdown() { return false } wkr.logger.WithFields(logrus.Fields{ "State": wkr.state, "IdleDuration": stats.Duration(time.Since(wkr.busy)), "IdleBehavior": wkr.idleBehavior, }).Info("shutdown worker") wkr.reportBootOutcome(BootOutcomeAborted) wkr.shutdown() return true } // caller must have lock. func (wkr *worker) shutdown() { now := time.Now() wkr.updated = now wkr.destroyed = now wkr.state = StateShutdown go wkr.wp.notify() go func() { err := wkr.instance.Destroy() if err != nil { wkr.logger.WithError(err).Warn("shutdown failed") return } }() } // Save worker tags to cloud provider metadata, if they don't already // match. Caller must have lock. func (wkr *worker) saveTags() { instance := wkr.instance tags := instance.Tags() update := cloud.InstanceTags{ wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name, wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior), } save := false for k, v := range update { if tags[k] != v { tags[k] = v save = true } } if save { go func() { err := instance.SetTags(tags) if err != nil { wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags") } }() } } func (wkr *worker) Close() { // This might take time, so do it after unlocking mtx. defer wkr.executor.Close() wkr.mtx.Lock() defer wkr.mtx.Unlock() for uuid, rr := range wkr.running { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() } for uuid, rr := range wkr.starting { wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") rr.Close() } } // Add/remove entries in wkr.running to match ctrUUIDs returned by a // probe. Returns true if anything was added or removed. // // Caller must have lock. func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) { alive := map[string]bool{} for _, uuid := range ctrUUIDs { alive[uuid] = true if _, ok := wkr.running[uuid]; ok { // unchanged } else if rr, ok := wkr.starting[uuid]; ok { wkr.running[uuid] = rr delete(wkr.starting, uuid) changed = true } else { // We didn't start it -- it must have been // started by a previous dispatcher process. wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected") wkr.running[uuid] = newRemoteRunner(uuid, wkr) changed = true } } for uuid := range wkr.running { if !alive[uuid] { wkr.closeRunner(uuid) changed = true } } return } // caller must have lock. func (wkr *worker) closeRunner(uuid string) { rr := wkr.running[uuid] if rr == nil { return } wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") delete(wkr.running, uuid) rr.Close() now := time.Now() wkr.updated = now wkr.wp.exited[uuid] = now if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { wkr.state = StateIdle } }