package worker
import (
+ "bytes"
+ "fmt"
+ "path/filepath"
+ "strings"
+ "sync"
"time"
+
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/stats"
+ "github.com/sirupsen/logrus"
+)
+
+const (
+ // TODO: configurable
+ maxPingFailTime = 10 * time.Minute
)
// State indicates whether a worker is available to do work, and (if
const (
StateUnknown State = iota // might be running a container already
StateBooting // instance is booting
- StateRunning // instance is running
+ StateIdle // instance booted, no containers are running
+ StateRunning // instance is running one or more containers
StateShutdown // worker has stopped monitoring the instance
- StateHold // running, but not available to run new containers
-)
-
-const (
- // TODO: configurable
- maxPingFailTime = 10 * time.Minute
)
var stateString = map[State]string{
StateUnknown: "unknown",
StateBooting: "booting",
+ StateIdle: "idle",
StateRunning: "running",
StateShutdown: "shutdown",
- StateHold: "hold",
}
// String implements fmt.Stringer.
func (s State) MarshalText() ([]byte, error) {
return []byte(stateString[s]), nil
}
+
+// BootOutcome is the result of a worker boot. It is used as a label in a metric.
+type BootOutcome string
+
+const (
+ BootOutcomeFailed BootOutcome = "failure"
+ BootOutcomeSucceeded BootOutcome = "success"
+ BootOutcomeAborted BootOutcome = "aborted"
+ BootOutcomeDisappeared BootOutcome = "disappeared"
+)
+
+var validBootOutcomes = map[BootOutcome]bool{
+ BootOutcomeFailed: true,
+ BootOutcomeSucceeded: true,
+ BootOutcomeAborted: true,
+ BootOutcomeDisappeared: true,
+}
+
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+ IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
+ IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
+ IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+ IdleBehaviorRun: true,
+ IdleBehaviorHold: true,
+ IdleBehaviorDrain: true,
+}
+
+type worker struct {
+ logger logrus.FieldLogger
+ executor Executor
+ wp *Pool
+
+ mtx sync.Locker // must be wp's Locker.
+ state State
+ idleBehavior IdleBehavior
+ instance cloud.Instance
+ instType arvados.InstanceType
+ vcpus int64
+ memory int64
+ appeared time.Time
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ destroyed time.Time
+ firstSSHConnection time.Time
+ lastUUID string
+ running map[string]*remoteRunner // remember to update state idle<->running when this changes
+ starting map[string]*remoteRunner // remember to update state idle<->running when this changes
+ probing chan struct{}
+ bootOutcomeReported bool
+ timeToReadyReported bool
+ staleRunLockSince time.Time
+}
+
+func (wkr *worker) onUnkillable(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ logger := wkr.logger.WithField("ContainerUUID", uuid)
+ if wkr.idleBehavior == IdleBehaviorHold {
+ logger.Warn("unkillable container, but worker has IdleBehavior=Hold")
+ return
+ }
+ logger.Warn("unkillable container, draining worker")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+}
+
+func (wkr *worker) onKilled(uuid string) {
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ wkr.closeRunner(uuid)
+ go wkr.wp.notify()
+}
+
+// caller must have lock.
+func (wkr *worker) reportBootOutcome(outcome BootOutcome) {
+ if wkr.bootOutcomeReported {
+ return
+ }
+ if wkr.wp.mBootOutcomes != nil {
+ wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc()
+ }
+ wkr.bootOutcomeReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) reportTimeBetweenFirstSSHAndReadyForContainer() {
+ if wkr.timeToReadyReported {
+ return
+ }
+ if wkr.wp.mTimeToSSH != nil {
+ wkr.wp.mTimeToReadyForContainer.Observe(time.Since(wkr.firstSSHConnection).Seconds())
+ }
+ wkr.timeToReadyReported = true
+}
+
+// caller must have lock.
+func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) {
+ wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior")
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+}
+
+// caller must have lock.
+func (wkr *worker) startContainer(ctr arvados.Container) {
+ logger := wkr.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "Priority": ctr.Priority,
+ })
+ logger.Debug("starting container")
+ rr := newRemoteRunner(ctr.UUID, wkr)
+ wkr.starting[ctr.UUID] = rr
+ if wkr.state != StateRunning {
+ wkr.state = StateRunning
+ go wkr.wp.notify()
+ }
+ go func() {
+ rr.Start()
+ if wkr.wp.mTimeFromQueueToCrunchRun != nil {
+ wkr.wp.mTimeFromQueueToCrunchRun.Observe(time.Since(ctr.CreatedAt).Seconds())
+ }
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ now := time.Now()
+ wkr.updated = now
+ wkr.busy = now
+ delete(wkr.starting, ctr.UUID)
+ wkr.running[ctr.UUID] = rr
+ wkr.lastUUID = ctr.UUID
+ }()
+}
+
+// ProbeAndUpdate conducts appropriate boot/running probes (if any)
+// for the worker's current state. If a previous probe is still
+// running, it does nothing.
+//
+// It should be called in a new goroutine.
+func (wkr *worker) ProbeAndUpdate() {
+ select {
+ case wkr.probing <- struct{}{}:
+ wkr.probeAndUpdate()
+ <-wkr.probing
+ default:
+ wkr.logger.Debug("still waiting for last probe to finish")
+ }
+}
+
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
+func (wkr *worker) probeAndUpdate() {
+ wkr.mtx.Lock()
+ updated := wkr.updated
+ initialState := wkr.state
+ wkr.mtx.Unlock()
+
+ var (
+ booted bool
+ ctrUUIDs []string
+ ok bool
+ stderr []byte // from probeBooted
+ )
+
+ switch initialState {
+ case StateShutdown:
+ return
+ case StateIdle, StateRunning:
+ booted = true
+ case StateUnknown, StateBooting:
+ default:
+ panic(fmt.Sprintf("unknown state %s", initialState))
+ }
+
+ probeStart := time.Now()
+ logger := wkr.logger.WithField("ProbeStart", probeStart)
+
+ if !booted {
+ booted, stderr = wkr.probeBooted()
+ if !booted {
+ // Pretend this probe succeeded if another
+ // concurrent attempt succeeded.
+ wkr.mtx.Lock()
+ booted = wkr.state == StateRunning || wkr.state == StateIdle
+ wkr.mtx.Unlock()
+ }
+ if booted {
+ logger.Info("instance booted; will try probeRunning")
+ }
+ }
+ reportedBroken := false
+ if booted || wkr.state == StateUnknown {
+ ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
+ }
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+ logger.Info("probe reported broken instance")
+ wkr.reportBootOutcome(BootOutcomeFailed)
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+ }
+ if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
+ if wkr.state == StateShutdown && wkr.updated.After(updated) {
+ // Skip the logging noise if shutdown was
+ // initiated during probe.
+ return
+ }
+ // Using the start time of the probe as the timeout
+ // threshold ensures we always initiate at least one
+ // probe attempt after the boot/probe timeout expires
+ // (otherwise, a slow probe failure could cause us to
+ // shutdown an instance even though it did in fact
+ // boot/recover before the timeout expired).
+ dur := probeStart.Sub(wkr.probed)
+ if wkr.shutdownIfBroken(dur) {
+ // stderr from failed run-probes will have
+ // been logged already, but boot-probe
+ // failures are normal so they are logged only
+ // at Debug level. This is our chance to log
+ // some evidence about why the node never
+ // booted, even in non-debug mode.
+ if !booted {
+ wkr.reportBootOutcome(BootOutcomeFailed)
+ logger.WithFields(logrus.Fields{
+ "Duration": dur,
+ "stderr": string(stderr),
+ }).Info("boot failed")
+ }
+ }
+ return
+ }
+
+ updateTime := time.Now()
+ wkr.probed = updateTime
+
+ if updated != wkr.updated {
+ // Worker was updated after the probe began, so
+ // wkr.running might have a container UUID that was
+ // not yet running when ctrUUIDs was generated. Leave
+ // wkr.running alone and wait for the next probe to
+ // catch up on any changes.
+ return
+ }
+
+ if len(ctrUUIDs) > 0 {
+ wkr.busy = updateTime
+ wkr.lastUUID = ctrUUIDs[0]
+ } else if len(wkr.running) > 0 {
+ // Actual last-busy time was sometime between wkr.busy
+ // and now. Now is the earliest opportunity to take
+ // advantage of the non-busy state, though.
+ wkr.busy = updateTime
+ }
+
+ changed := wkr.updateRunning(ctrUUIDs)
+
+ // Update state if this was the first successful boot-probe.
+ if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+ if wkr.state == StateBooting {
+ wkr.reportTimeBetweenFirstSSHAndReadyForContainer()
+ }
+ // Note: this will change again below if
+ // len(wkr.starting)+len(wkr.running) > 0.
+ wkr.state = StateIdle
+ changed = true
+ }
+
+ // If wkr.state and wkr.running aren't changing then there's
+ // no need to log anything, notify the scheduler, move state
+ // back and forth between idle/running, etc.
+ if !changed {
+ return
+ }
+
+ // Log whenever a run-probe reveals crunch-run processes
+ // appearing/disappearing before boot-probe succeeds.
+ if wkr.state == StateUnknown && changed {
+ logger.WithFields(logrus.Fields{
+ "RunningContainers": len(wkr.running),
+ "State": wkr.state,
+ }).Info("crunch-run probe succeeded, but boot probe is still failing")
+ }
+
+ if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+ wkr.state = StateRunning
+ } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+ wkr.state = StateIdle
+ }
+ wkr.updated = updateTime
+ if booted && (initialState == StateUnknown || initialState == StateBooting) {
+ wkr.reportBootOutcome(BootOutcomeSucceeded)
+ logger.WithFields(logrus.Fields{
+ "RunningContainers": len(wkr.running),
+ "State": wkr.state,
+ }).Info("probes succeeded, instance is in service")
+ }
+ go wkr.wp.notify()
+}
+
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
+ cmd := wkr.wp.runnerCmd + " --list"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ before := time.Now()
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+ if err != nil {
+ wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ }).WithError(err).Warn("probe failed")
+ wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
+ return
+ }
+ wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
+ ok = true
+
+ staleRunLock := false
+ for _, s := range strings.Split(string(stdout), "\n") {
+ // Each line of the "crunch-run --list" output is one
+ // of the following:
+ //
+ // * a container UUID, indicating that processes
+ // related to that container are currently running.
+ // Optionally followed by " stale", indicating that
+ // the crunch-run process itself has exited (the
+ // remaining process is probably arv-mount).
+ //
+ // * the string "broken", indicating that the instance
+ // appears incapable of starting containers.
+ //
+ // See ListProcesses() in lib/crunchrun/background.go.
+ if s == "" {
+ // empty string following final newline
+ } else if s == "broken" {
+ reportsBroken = true
+ } else if toks := strings.Split(s, " "); len(toks) == 1 {
+ running = append(running, s)
+ } else if toks[1] == "stale" {
+ wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+ staleRunLock = true
+ }
+ }
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ if !staleRunLock {
+ wkr.staleRunLockSince = time.Time{}
+ } else if wkr.staleRunLockSince.IsZero() {
+ wkr.staleRunLockSince = time.Now()
+ } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+ wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+ reportsBroken = true
+ }
+ return
+}
+
+func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+ cmd := wkr.wp.bootProbeCommand
+ if cmd == "" {
+ cmd = "true"
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+ logger := wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ })
+ if err != nil {
+ logger.WithError(err).Debug("boot probe failed")
+ return false, stderr
+ }
+ logger.Info("boot probe succeeded")
+ if err = wkr.wp.loadRunnerData(); err != nil {
+ wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+ return false, stderr
+ } else if len(wkr.wp.runnerData) == 0 {
+ // Assume crunch-run is already installed
+ } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+ wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+ return false, stderr2
+ } else {
+ stderr = append(stderr, stderr2...)
+ }
+ return true, stderr
+}
+
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+ hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+ dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+ logger := wkr.logger.WithFields(logrus.Fields{
+ "hash": hash,
+ "path": wkr.wp.runnerCmd,
+ })
+
+ stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+ if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) {
+ logger.Info("runner binary already exists on worker, with correct hash")
+ return
+ }
+
+ // Note touch+chmod come before writing data, to avoid the
+ // possibility of md5 being correct while file mode is
+ // incorrect.
+ cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"`
+ if wkr.instance.RemoteUser() != "root" {
+ cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+ }
+ logger.WithField("cmd", cmd).Info("installing runner binary on worker")
+ stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+ return
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
+ if wkr.idleBehavior == IdleBehaviorHold {
+ // Never shut down.
+ return false
+ }
+ label, threshold := "", wkr.wp.timeoutProbe
+ if wkr.state == StateUnknown || wkr.state == StateBooting {
+ label, threshold = "new ", wkr.wp.timeoutBooting
+ }
+ if dur < threshold {
+ return false
+ }
+ wkr.logger.WithFields(logrus.Fields{
+ "Duration": dur,
+ "Since": wkr.probed,
+ "State": wkr.state,
+ }).Warnf("%sinstance unresponsive, shutting down", label)
+ wkr.shutdown()
+ return true
+}
+
+// Returns true if the instance is eligible for shutdown: either it's
+// been idle too long, or idleBehavior=Drain and nothing is running.
+//
+// caller must have lock.
+func (wkr *worker) eligibleForShutdown() bool {
+ if wkr.idleBehavior == IdleBehaviorHold {
+ return false
+ }
+ draining := wkr.idleBehavior == IdleBehaviorDrain
+ switch wkr.state {
+ case StateBooting:
+ return draining
+ case StateIdle:
+ return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle
+ case StateRunning:
+ if !draining {
+ return false
+ }
+ for _, rr := range wkr.running {
+ if !rr.givenup {
+ return false
+ }
+ }
+ for _, rr := range wkr.starting {
+ if !rr.givenup {
+ return false
+ }
+ }
+ // draining, and all remaining runners are just trying
+ // to force-kill their crunch-run procs
+ return true
+ default:
+ return false
+ }
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+ if !wkr.eligibleForShutdown() {
+ return false
+ }
+ wkr.logger.WithFields(logrus.Fields{
+ "State": wkr.state,
+ "IdleDuration": stats.Duration(time.Since(wkr.busy)),
+ "IdleBehavior": wkr.idleBehavior,
+ }).Info("shutdown worker")
+ wkr.reportBootOutcome(BootOutcomeAborted)
+ wkr.shutdown()
+ return true
+}
+
+// caller must have lock.
+func (wkr *worker) shutdown() {
+ now := time.Now()
+ wkr.updated = now
+ wkr.destroyed = now
+ wkr.state = StateShutdown
+ go wkr.wp.notify()
+ go func() {
+ err := wkr.instance.Destroy()
+ if err != nil {
+ wkr.logger.WithError(err).Warn("shutdown failed")
+ return
+ }
+ }()
+}
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+ instance := wkr.instance
+ tags := instance.Tags()
+ update := cloud.InstanceTags{
+ wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
+ wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
+ }
+ save := false
+ for k, v := range update {
+ if tags[k] != v {
+ tags[k] = v
+ save = true
+ }
+ }
+ if save {
+ go func() {
+ err := instance.SetTags(tags)
+ if err != nil {
+ wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
+ }
+ }()
+ }
+}
+
+func (wkr *worker) Close() {
+ // This might take time, so do it after unlocking mtx.
+ defer wkr.executor.Close()
+
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ for uuid, rr := range wkr.running {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+ for uuid, rr := range wkr.starting {
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned")
+ rr.Close()
+ }
+}
+
+// Add/remove entries in wkr.running to match ctrUUIDs returned by a
+// probe. Returns true if anything was added or removed.
+//
+// Caller must have lock.
+func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) {
+ alive := map[string]bool{}
+ for _, uuid := range ctrUUIDs {
+ alive[uuid] = true
+ if _, ok := wkr.running[uuid]; ok {
+ // unchanged
+ } else if rr, ok := wkr.starting[uuid]; ok {
+ wkr.running[uuid] = rr
+ delete(wkr.starting, uuid)
+ changed = true
+ } else {
+ // We didn't start it -- it must have been
+ // started by a previous dispatcher process.
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ wkr.running[uuid] = newRemoteRunner(uuid, wkr)
+ changed = true
+ }
+ }
+ for uuid := range wkr.running {
+ if !alive[uuid] {
+ wkr.closeRunner(uuid)
+ changed = true
+ }
+ }
+ return
+}
+
+// caller must have lock.
+func (wkr *worker) closeRunner(uuid string) {
+ rr := wkr.running[uuid]
+ if rr == nil {
+ return
+ }
+ wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+ delete(wkr.running, uuid)
+ rr.Close()
+
+ now := time.Now()
+ wkr.updated = now
+ wkr.wp.exited[uuid] = now
+ if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+ wkr.state = StateIdle
+ }
+}