X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3c538444c15e68e96780f157935f2baa4ba0bc5..c88ffa1a163c929ffa963af3eb1bcdbca1f6b6f2:/lib/dispatchcloud/worker/worker.go?ds=sidebyside diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 7828d4f69f..a75d2bbb88 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -5,7 +5,20 @@ package worker import ( + "bytes" + "fmt" + "strings" + "sync" "time" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" +) + +const ( + // TODO: configurable + maxPingFailTime = 10 * time.Minute ) // State indicates whether a worker is available to do work, and (if @@ -15,22 +28,17 @@ type State int const ( StateUnknown State = iota // might be running a container already StateBooting // instance is booting - StateRunning // instance is running + StateIdle // instance booted, no containers are running + StateRunning // instance is running one or more containers StateShutdown // worker has stopped monitoring the instance - StateHold // running, but not available to run new containers -) - -const ( - // TODO: configurable - maxPingFailTime = 10 * time.Minute ) var stateString = map[State]string{ StateUnknown: "unknown", StateBooting: "booting", + StateIdle: "idle", StateRunning: "running", StateShutdown: "shutdown", - StateHold: "hold", } // String implements fmt.Stringer. @@ -43,3 +51,361 @@ func (s State) String() string { func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } + +// IdleBehavior indicates the behavior desired when a node becomes idle. +type IdleBehavior string + +const ( + IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout + IdleBehaviorHold = "hold" // don't shutdown or run more containers + IdleBehaviorDrain = "drain" // shutdown immediately when idle +) + +var validIdleBehavior = map[IdleBehavior]bool{ + IdleBehaviorRun: true, + IdleBehaviorHold: true, + IdleBehaviorDrain: true, +} + +type worker struct { + logger logrus.FieldLogger + executor Executor + wp *Pool + + mtx sync.Locker // must be wp's Locker. + state State + idleBehavior IdleBehavior + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + lastUUID string + running map[string]struct{} // remember to update state idle<->running when this changes + starting map[string]struct{} // remember to update state idle<->running when this changes + probing chan struct{} +} + +// caller must have lock. +func (wkr *worker) startContainer(ctr arvados.Container) { + logger := wkr.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "Priority": ctr.Priority, + }) + logger = logger.WithField("Instance", wkr.instance) + logger.Debug("starting container") + wkr.starting[ctr.UUID] = struct{}{} + wkr.state = StateRunning + go func() { + env := map[string]string{ + "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost, + "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken, + } + stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil) + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + now := time.Now() + wkr.updated = now + wkr.busy = now + delete(wkr.starting, ctr.UUID) + wkr.running[ctr.UUID] = struct{}{} + wkr.lastUUID = ctr.UUID + if err != nil { + logger.WithField("stdout", string(stdout)). + WithField("stderr", string(stderr)). + WithError(err). + Error("error starting crunch-run process") + // Leave uuid in wkr.running, though: it's + // possible the error was just a communication + // failure and the process was in fact + // started. Wait for next probe to find out. + return + } + logger.Info("crunch-run process started") + wkr.lastUUID = ctr.UUID + }() +} + +// ProbeAndUpdate conducts appropriate boot/running probes (if any) +// for the worker's curent state. If a previous probe is still +// running, it does nothing. +// +// It should be called in a new goroutine. +func (wkr *worker) ProbeAndUpdate() { + select { + case wkr.probing <- struct{}{}: + wkr.probeAndUpdate() + <-wkr.probing + default: + wkr.logger.Debug("still waiting for last probe to finish") + } +} + +// probeAndUpdate calls probeBooted and/or probeRunning if needed, and +// updates state accordingly. +// +// In StateUnknown: Call both probeBooted and probeRunning. +// In StateBooting: Call probeBooted; if successful, call probeRunning. +// In StateRunning: Call probeRunning. +// In StateIdle: Call probeRunning. +// In StateShutdown: Do nothing. +// +// If both probes succeed, wkr.state changes to +// StateIdle/StateRunning. +// +// If probeRunning succeeds, wkr.running is updated. (This means +// wkr.running might be non-empty even in StateUnknown, if the boot +// probe failed.) +// +// probeAndUpdate should be called in a new goroutine. +func (wkr *worker) probeAndUpdate() { + wkr.mtx.Lock() + updated := wkr.updated + initialState := wkr.state + wkr.mtx.Unlock() + + var ( + booted bool + ctrUUIDs []string + ok bool + stderr []byte + ) + + switch initialState { + case StateShutdown: + return + case StateIdle, StateRunning: + booted = true + case StateUnknown, StateBooting: + default: + panic(fmt.Sprintf("unknown state %s", initialState)) + } + + if !booted { + booted, stderr = wkr.probeBooted() + if !booted { + // Pretend this probe succeeded if another + // concurrent attempt succeeded. + wkr.mtx.Lock() + booted = wkr.state == StateRunning || wkr.state == StateIdle + wkr.mtx.Unlock() + } else { + wkr.logger.Info("instance booted; will try probeRunning") + } + } + if booted || wkr.state == StateUnknown { + ctrUUIDs, ok, stderr = wkr.probeRunning() + } + logger := wkr.logger.WithField("stderr", string(stderr)) + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { + if wkr.state == StateShutdown && wkr.updated.After(updated) { + // Skip the logging noise if shutdown was + // initiated during probe. + return + } + dur := time.Since(wkr.probed) + logger := logger.WithFields(logrus.Fields{ + "Duration": dur, + "State": wkr.state, + }) + if !booted { + // While we're polling the VM to see if it's + // finished booting, failures are not + // noteworthy, so we log at Debug level. + logger.Debug("new instance not responding") + } else { + logger.Info("instance not responding") + } + wkr.shutdownIfBroken(dur) + return + } + + updateTime := time.Now() + wkr.probed = updateTime + + if updated != wkr.updated { + // Worker was updated after the probe began, so + // wkr.running might have a container UUID that was + // not yet running when ctrUUIDs was generated. Leave + // wkr.running alone and wait for the next probe to + // catch up on any changes. + return + } + + if len(ctrUUIDs) > 0 { + wkr.busy = updateTime + wkr.lastUUID = ctrUUIDs[0] + } else if len(wkr.running) > 0 { + // Actual last-busy time was sometime between wkr.busy + // and now. Now is the earliest opportunity to take + // advantage of the non-busy state, though. + wkr.busy = updateTime + } + running := map[string]struct{}{} + changed := false + for _, uuid := range ctrUUIDs { + running[uuid] = struct{}{} + if _, ok := wkr.running[uuid]; !ok { + changed = true + } + } + for uuid := range wkr.running { + if _, ok := running[uuid]; !ok { + logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") + wkr.wp.notifyExited(uuid, updateTime) + changed = true + } + } + if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { + // Note: this will change again below if + // len(wkr.starting)+len(wkr.running) > 0. + wkr.state = StateIdle + changed = true + } else if wkr.state == StateUnknown && len(running) != len(wkr.running) { + logger.WithFields(logrus.Fields{ + "RunningContainers": len(running), + "State": wkr.state, + }).Info("crunch-run probe succeeded, but boot probe is still failing") + } + if !changed { + return + } + + wkr.running = running + if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { + wkr.state = StateRunning + } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { + wkr.state = StateIdle + } + wkr.updated = updateTime + if booted && (initialState == StateUnknown || initialState == StateBooting) { + logger.WithFields(logrus.Fields{ + "RunningContainers": len(running), + "State": wkr.state, + }).Info("probes succeeded, instance is in service") + } + go wkr.wp.notify() +} + +func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) { + cmd := "crunch-run --list" + stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) + if err != nil { + wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }).WithError(err).Warn("probe failed") + return nil, false, stderr + } + stdout = bytes.TrimRight(stdout, "\n") + if len(stdout) == 0 { + return nil, true, stderr + } + return strings.Split(string(stdout), "\n"), true, stderr +} + +func (wkr *worker) probeBooted() (ok bool, stderr []byte) { + cmd := wkr.wp.bootProbeCommand + if cmd == "" { + cmd = "true" + } + stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) + logger := wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }) + if err != nil { + logger.WithError(err).Debug("boot probe failed") + return false, stderr + } + logger.Info("boot probe succeeded") + return true, stderr +} + +// caller must have lock. +func (wkr *worker) shutdownIfBroken(dur time.Duration) { + if wkr.idleBehavior == IdleBehaviorHold { + return + } + label, threshold := "", wkr.wp.timeoutProbe + if wkr.state == StateUnknown || wkr.state == StateBooting { + label, threshold = "new ", wkr.wp.timeoutBooting + } + if dur < threshold { + return + } + wkr.logger.WithFields(logrus.Fields{ + "Duration": dur, + "Since": wkr.probed, + "State": wkr.state, + }).Warnf("%sinstance unresponsive, shutting down", label) + wkr.shutdown() +} + +// caller must have lock. +func (wkr *worker) shutdownIfIdle() bool { + if wkr.idleBehavior == IdleBehaviorHold { + return false + } + if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) { + return false + } + age := time.Since(wkr.busy) + if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle { + return false + } + wkr.logger.WithFields(logrus.Fields{ + "Age": age, + "IdleBehavior": wkr.idleBehavior, + }).Info("shutdown idle worker") + wkr.shutdown() + return true +} + +// caller must have lock. +func (wkr *worker) shutdown() { + now := time.Now() + wkr.updated = now + wkr.destroyed = now + wkr.state = StateShutdown + go wkr.wp.notify() + go func() { + err := wkr.instance.Destroy() + if err != nil { + wkr.logger.WithError(err).Warn("shutdown failed") + return + } + }() +} + +// Save worker tags to cloud provider metadata, if they don't already +// match. Caller must have lock. +func (wkr *worker) saveTags() { + instance := wkr.instance + have := instance.Tags() + want := cloud.InstanceTags{ + tagKeyInstanceType: wkr.instType.Name, + tagKeyIdleBehavior: string(wkr.idleBehavior), + } + go func() { + for k, v := range want { + if v == have[k] { + continue + } + err := instance.SetTags(want) + if err != nil { + wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags") + } + break + + } + }() +}