X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc952178056bf6d29471f6986306fb673dcf394a..be8ed479042df4fdefe1fd18c1e2e984e1c99bc0:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 85104a13aa..a75d2bbb88 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -6,13 +6,19 @@ package worker import ( "bytes" + "fmt" "strings" "sync" "time" "git.curoverse.com/arvados.git/lib/cloud" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" + "github.com/sirupsen/logrus" +) + +const ( + // TODO: configurable + maxPingFailTime = 10 * time.Minute ) // State indicates whether a worker is available to do work, and (if @@ -25,12 +31,6 @@ const ( StateIdle // instance booted, no containers are running StateRunning // instance is running one or more containers StateShutdown // worker has stopped monitoring the instance - StateHold // running, but not available to run new containers -) - -const ( - // TODO: configurable - maxPingFailTime = 10 * time.Minute ) var stateString = map[State]string{ @@ -39,7 +39,6 @@ var stateString = map[State]string{ StateIdle: "idle", StateRunning: "running", StateShutdown: "shutdown", - StateHold: "hold", } // String implements fmt.Stringer. @@ -53,25 +52,42 @@ func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } +// IdleBehavior indicates the behavior desired when a node becomes idle. +type IdleBehavior string + +const ( + IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout + IdleBehaviorHold = "hold" // don't shutdown or run more containers + IdleBehaviorDrain = "drain" // shutdown immediately when idle +) + +var validIdleBehavior = map[IdleBehavior]bool{ + IdleBehaviorRun: true, + IdleBehaviorHold: true, + IdleBehaviorDrain: true, +} + type worker struct { logger logrus.FieldLogger executor Executor wp *Pool - mtx sync.Locker // must be wp's Locker. - state State - instance cloud.Instance - instType arvados.InstanceType - vcpus int64 - memory int64 - probed time.Time - updated time.Time - busy time.Time - destroyed time.Time - lastUUID string - running map[string]struct{} // remember to update state idle<->running when this changes - starting map[string]struct{} // remember to update state idle<->running when this changes - probing chan struct{} + mtx sync.Locker // must be wp's Locker. + state State + idleBehavior IdleBehavior + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + lastUUID string + running map[string]struct{} // remember to update state idle<->running when this changes + starting map[string]struct{} // remember to update state idle<->running when this changes + probing chan struct{} } // caller must have lock. @@ -85,7 +101,11 @@ func (wkr *worker) startContainer(ctr arvados.Container) { wkr.starting[ctr.UUID] = struct{}{} wkr.state = StateRunning go func() { - stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) + env := map[string]string{ + "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost, + "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken, + } + stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil) wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() @@ -125,38 +145,65 @@ func (wkr *worker) ProbeAndUpdate() { } } -// should be called in a new goroutine +// probeAndUpdate calls probeBooted and/or probeRunning if needed, and +// updates state accordingly. +// +// In StateUnknown: Call both probeBooted and probeRunning. +// In StateBooting: Call probeBooted; if successful, call probeRunning. +// In StateRunning: Call probeRunning. +// In StateIdle: Call probeRunning. +// In StateShutdown: Do nothing. +// +// If both probes succeed, wkr.state changes to +// StateIdle/StateRunning. +// +// If probeRunning succeeds, wkr.running is updated. (This means +// wkr.running might be non-empty even in StateUnknown, if the boot +// probe failed.) +// +// probeAndUpdate should be called in a new goroutine. func (wkr *worker) probeAndUpdate() { wkr.mtx.Lock() updated := wkr.updated - needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle - needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting + initialState := wkr.state wkr.mtx.Unlock() - if !needProbeBooted && !needProbeRunning { - return - } var ( + booted bool ctrUUIDs []string ok bool stderr []byte ) - if needProbeBooted { - ok, stderr = wkr.probeBooted() - wkr.mtx.Lock() - if ok || wkr.state == StateRunning || wkr.state == StateIdle { + + switch initialState { + case StateShutdown: + return + case StateIdle, StateRunning: + booted = true + case StateUnknown, StateBooting: + default: + panic(fmt.Sprintf("unknown state %s", initialState)) + } + + if !booted { + booted, stderr = wkr.probeBooted() + if !booted { + // Pretend this probe succeeded if another + // concurrent attempt succeeded. + wkr.mtx.Lock() + booted = wkr.state == StateRunning || wkr.state == StateIdle + wkr.mtx.Unlock() + } else { wkr.logger.Info("instance booted; will try probeRunning") - needProbeRunning = true } - wkr.mtx.Unlock() } - if needProbeRunning { + if booted || wkr.state == StateUnknown { ctrUUIDs, ok, stderr = wkr.probeRunning() } logger := wkr.logger.WithField("stderr", string(stderr)) wkr.mtx.Lock() defer wkr.mtx.Unlock() - if !ok { + if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { if wkr.state == StateShutdown && wkr.updated.After(updated) { // Skip the logging noise if shutdown was // initiated during probe. @@ -167,7 +214,10 @@ func (wkr *worker) probeAndUpdate() { "Duration": dur, "State": wkr.state, }) - if wkr.state == StateBooting { + if !booted { + // While we're polling the VM to see if it's + // finished booting, failures are not + // noteworthy, so we log at Debug level. logger.Debug("new instance not responding") } else { logger.Info("instance not responding") @@ -212,25 +262,40 @@ func (wkr *worker) probeAndUpdate() { changed = true } } - if wkr.state == StateUnknown || wkr.state == StateBooting { + if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { + // Note: this will change again below if + // len(wkr.starting)+len(wkr.running) > 0. wkr.state = StateIdle changed = true + } else if wkr.state == StateUnknown && len(running) != len(wkr.running) { + logger.WithFields(logrus.Fields{ + "RunningContainers": len(running), + "State": wkr.state, + }).Info("crunch-run probe succeeded, but boot probe is still failing") } - if changed { - wkr.running = running - if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { - wkr.state = StateRunning - } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { - wkr.state = StateIdle - } - wkr.updated = updateTime - go wkr.wp.notify() + if !changed { + return + } + + wkr.running = running + if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { + wkr.state = StateRunning + } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { + wkr.state = StateIdle + } + wkr.updated = updateTime + if booted && (initialState == StateUnknown || initialState == StateBooting) { + logger.WithFields(logrus.Fields{ + "RunningContainers": len(running), + "State": wkr.state, + }).Info("probes succeeded, instance is in service") } + go wkr.wp.notify() } func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) { cmd := "crunch-run --list" - stdout, stderr, err := wkr.executor.Execute(cmd, nil) + stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) if err != nil { wkr.logger.WithFields(logrus.Fields{ "Command": cmd, @@ -251,7 +316,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { if cmd == "" { cmd = "true" } - stdout, stderr, err := wkr.executor.Execute(cmd, nil) + stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) logger := wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), @@ -267,11 +332,11 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) { - if wkr.state == StateHold { + if wkr.idleBehavior == IdleBehaviorHold { return } label, threshold := "", wkr.wp.timeoutProbe - if wkr.state == StateBooting { + if wkr.state == StateUnknown || wkr.state == StateBooting { label, threshold = "new ", wkr.wp.timeoutBooting } if dur < threshold { @@ -287,19 +352,25 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) { // caller must have lock. func (wkr *worker) shutdownIfIdle() bool { - if wkr.state != StateIdle { + if wkr.idleBehavior == IdleBehaviorHold { + return false + } + if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) { return false } age := time.Since(wkr.busy) - if age < wkr.wp.timeoutIdle { + if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle { return false } - wkr.logger.WithField("Age", age).Info("shutdown idle worker") + wkr.logger.WithFields(logrus.Fields{ + "Age": age, + "IdleBehavior": wkr.idleBehavior, + }).Info("shutdown idle worker") wkr.shutdown() return true } -// caller must have lock +// caller must have lock. func (wkr *worker) shutdown() { now := time.Now() wkr.updated = now @@ -314,3 +385,27 @@ func (wkr *worker) shutdown() { } }() } + +// Save worker tags to cloud provider metadata, if they don't already +// match. Caller must have lock. +func (wkr *worker) saveTags() { + instance := wkr.instance + have := instance.Tags() + want := cloud.InstanceTags{ + tagKeyInstanceType: wkr.instType.Name, + tagKeyIdleBehavior: string(wkr.idleBehavior), + } + go func() { + for k, v := range want { + if v == have[k] { + continue + } + err := instance.SetTags(want) + if err != nil { + wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags") + } + break + + } + }() +}