X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/949d940b5f75114f86d381c347f0ba8f26b3e9b9..5155ef0eaae5a3cdef3d8c3490d138dfcbeaa736:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index c26186309a..03ab15176f 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -5,14 +5,20 @@ package worker import ( - "bytes" + "fmt" "strings" "sync" "time" "git.curoverse.com/arvados.git/lib/cloud" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" + "git.curoverse.com/arvados.git/sdk/go/stats" + "github.com/sirupsen/logrus" +) + +const ( + // TODO: configurable + maxPingFailTime = 10 * time.Minute ) // State indicates whether a worker is available to do work, and (if @@ -25,12 +31,6 @@ const ( StateIdle // instance booted, no containers are running StateRunning // instance is running one or more containers StateShutdown // worker has stopped monitoring the instance - StateHold // running, but not available to run new containers -) - -const ( - // TODO: configurable - maxPingFailTime = 10 * time.Minute ) var stateString = map[State]string{ @@ -39,7 +39,6 @@ var stateString = map[State]string{ StateIdle: "idle", StateRunning: "running", StateShutdown: "shutdown", - StateHold: "hold", } // String implements fmt.Stringer. @@ -53,26 +52,69 @@ func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } +// IdleBehavior indicates the behavior desired when a node becomes idle. +type IdleBehavior string + +const ( + IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout + IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers + IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle +) + +var validIdleBehavior = map[IdleBehavior]bool{ + IdleBehaviorRun: true, + IdleBehaviorHold: true, + IdleBehaviorDrain: true, +} + type worker struct { logger logrus.FieldLogger executor Executor wp *Pool - mtx sync.Locker // must be wp's Locker. - state State - instance cloud.Instance - instType arvados.InstanceType - vcpus int64 - memory int64 - appeared time.Time - probed time.Time - updated time.Time - busy time.Time - destroyed time.Time - lastUUID string - running map[string]struct{} // remember to update state idle<->running when this changes - starting map[string]struct{} // remember to update state idle<->running when this changes - probing chan struct{} + mtx sync.Locker // must be wp's Locker. + state State + idleBehavior IdleBehavior + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + lastUUID string + running map[string]*remoteRunner // remember to update state idle<->running when this changes + starting map[string]*remoteRunner // remember to update state idle<->running when this changes + probing chan struct{} +} + +func (wkr *worker) onUnkillable(uuid string) { + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + logger := wkr.logger.WithField("ContainerUUID", uuid) + if wkr.idleBehavior == IdleBehaviorHold { + logger.Warn("unkillable container, but worker has IdleBehavior=Hold") + return + } + logger.Warn("unkillable container, draining worker") + wkr.setIdleBehavior(IdleBehaviorDrain) +} + +func (wkr *worker) onKilled(uuid string) { + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + wkr.closeRunner(uuid) + go wkr.wp.notify() +} + +// caller must have lock. +func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) { + wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior") + wkr.idleBehavior = idleBehavior + wkr.saveTags() + wkr.shutdownIfIdle() } // caller must have lock. @@ -81,32 +123,22 @@ func (wkr *worker) startContainer(ctr arvados.Container) { "ContainerUUID": ctr.UUID, "Priority": ctr.Priority, }) - logger = logger.WithField("Instance", wkr.instance) logger.Debug("starting container") - wkr.starting[ctr.UUID] = struct{}{} - wkr.state = StateRunning + rr := newRemoteRunner(ctr.UUID, wkr) + wkr.starting[ctr.UUID] = rr + if wkr.state != StateRunning { + wkr.state = StateRunning + go wkr.wp.notify() + } go func() { - stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) + rr.Start() wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() wkr.updated = now wkr.busy = now delete(wkr.starting, ctr.UUID) - wkr.running[ctr.UUID] = struct{}{} - wkr.lastUUID = ctr.UUID - if err != nil { - logger.WithField("stdout", string(stdout)). - WithField("stderr", string(stderr)). - WithError(err). - Error("error starting crunch-run process") - // Leave uuid in wkr.running, though: it's - // possible the error was just a communication - // failure and the process was in fact - // started. Wait for next probe to find out. - return - } - logger.Info("crunch-run process started") + wkr.running[ctr.UUID] = rr wkr.lastUUID = ctr.UUID }() } @@ -126,57 +158,99 @@ func (wkr *worker) ProbeAndUpdate() { } } -// should be called in a new goroutine +// probeAndUpdate calls probeBooted and/or probeRunning if needed, and +// updates state accordingly. +// +// In StateUnknown: Call both probeBooted and probeRunning. +// In StateBooting: Call probeBooted; if successful, call probeRunning. +// In StateRunning: Call probeRunning. +// In StateIdle: Call probeRunning. +// In StateShutdown: Do nothing. +// +// If both probes succeed, wkr.state changes to +// StateIdle/StateRunning. +// +// If probeRunning succeeds, wkr.running is updated. (This means +// wkr.running might be non-empty even in StateUnknown, if the boot +// probe failed.) +// +// probeAndUpdate should be called in a new goroutine. func (wkr *worker) probeAndUpdate() { wkr.mtx.Lock() updated := wkr.updated - needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle - needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting + initialState := wkr.state wkr.mtx.Unlock() - if !needProbeBooted && !needProbeRunning { - return - } var ( + booted bool ctrUUIDs []string ok bool - stderr []byte + stderr []byte // from probeBooted ) - if needProbeBooted { - ok, stderr = wkr.probeBooted() - wkr.mtx.Lock() - if ok || wkr.state == StateRunning || wkr.state == StateIdle { - wkr.logger.Info("instance booted; will try probeRunning") - needProbeRunning = true + + switch initialState { + case StateShutdown: + return + case StateIdle, StateRunning: + booted = true + case StateUnknown, StateBooting: + default: + panic(fmt.Sprintf("unknown state %s", initialState)) + } + + probeStart := time.Now() + logger := wkr.logger.WithField("ProbeStart", probeStart) + + if !booted { + booted, stderr = wkr.probeBooted() + if !booted { + // Pretend this probe succeeded if another + // concurrent attempt succeeded. + wkr.mtx.Lock() + booted = wkr.state == StateRunning || wkr.state == StateIdle + wkr.mtx.Unlock() + } + if booted { + logger.Info("instance booted; will try probeRunning") } - wkr.mtx.Unlock() } - if needProbeRunning { - ctrUUIDs, ok, stderr = wkr.probeRunning() + reportedBroken := false + if booted || wkr.state == StateUnknown { + ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } - logger := wkr.logger.WithField("stderr", string(stderr)) wkr.mtx.Lock() defer wkr.mtx.Unlock() - if !ok { + if reportedBroken && wkr.idleBehavior == IdleBehaviorRun { + logger.Info("probe reported broken instance") + wkr.setIdleBehavior(IdleBehaviorDrain) + } + if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { if wkr.state == StateShutdown && wkr.updated.After(updated) { // Skip the logging noise if shutdown was // initiated during probe. return } - dur := time.Since(wkr.probed) - logger := logger.WithFields(logrus.Fields{ - "Duration": dur, - "State": wkr.state, - }) - if wkr.state == StateBooting && !needProbeRunning { - // If we know the instance has never passed a - // boot probe, it's not noteworthy that it - // hasn't passed this probe. - logger.Debug("new instance not responding") - } else { - logger.Info("instance not responding") + // Using the start time of the probe as the timeout + // threshold ensures we always initiate at least one + // probe attempt after the boot/probe timeout expires + // (otherwise, a slow probe failure could cause us to + // shutdown an instance even though it did in fact + // boot/recover before the timeout expired). + dur := probeStart.Sub(wkr.probed) + if wkr.shutdownIfBroken(dur) { + // stderr from failed run-probes will have + // been logged already, but boot-probe + // failures are normal so they are logged only + // at Debug level. This is our chance to log + // some evidence about why the node never + // booted, even in non-debug mode. + if !booted { + logger.WithFields(logrus.Fields{ + "Duration": dur, + "stderr": string(stderr), + }).Info("boot failed") + } } - wkr.shutdownIfBroken(dur) return } @@ -201,53 +275,71 @@ func (wkr *worker) probeAndUpdate() { // advantage of the non-busy state, though. wkr.busy = updateTime } - running := map[string]struct{}{} - changed := false - for _, uuid := range ctrUUIDs { - running[uuid] = struct{}{} - if _, ok := wkr.running[uuid]; !ok { - changed = true - } + + changed := wkr.updateRunning(ctrUUIDs) + + // Update state if this was the first successful boot-probe. + if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { + // Note: this will change again below if + // len(wkr.starting)+len(wkr.running) > 0. + wkr.state = StateIdle + changed = true } - for uuid := range wkr.running { - if _, ok := running[uuid]; !ok { - logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") - wkr.wp.notifyExited(uuid, updateTime) - changed = true - } + + // If wkr.state and wkr.running aren't changing then there's + // no need to log anything, notify the scheduler, move state + // back and forth between idle/running, etc. + if !changed { + return } - if wkr.state == StateUnknown || wkr.state == StateBooting { + + // Log whenever a run-probe reveals crunch-run processes + // appearing/disappearing before boot-probe succeeds. + if wkr.state == StateUnknown && changed { + logger.WithFields(logrus.Fields{ + "RunningContainers": len(wkr.running), + "State": wkr.state, + }).Info("crunch-run probe succeeded, but boot probe is still failing") + } + + if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { + wkr.state = StateRunning + } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { wkr.state = StateIdle - changed = true } - if changed { - wkr.running = running - if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { - wkr.state = StateRunning - } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { - wkr.state = StateIdle - } - wkr.updated = updateTime - go wkr.wp.notify() + wkr.updated = updateTime + if booted && (initialState == StateUnknown || initialState == StateBooting) { + logger.WithFields(logrus.Fields{ + "RunningContainers": len(wkr.running), + "State": wkr.state, + }).Info("probes succeeded, instance is in service") } + go wkr.wp.notify() } -func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) { +func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { cmd := "crunch-run --list" - stdout, stderr, err := wkr.executor.Execute(cmd, nil) + if u := wkr.instance.RemoteUser(); u != "root" { + cmd = "sudo " + cmd + } + stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) if err != nil { wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") - return nil, false, stderr + return } - stdout = bytes.TrimRight(stdout, "\n") - if len(stdout) == 0 { - return nil, true, stderr + ok = true + for _, s := range strings.Split(string(stdout), "\n") { + if s == "broken" { + reportsBroken = true + } else if s != "" { + running = append(running, s) + } } - return strings.Split(string(stdout), "\n"), true, stderr + return } func (wkr *worker) probeBooted() (ok bool, stderr []byte) { @@ -255,7 +347,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { if cmd == "" { cmd = "true" } - stdout, stderr, err := wkr.executor.Execute(cmd, nil) + stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil) logger := wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), @@ -270,16 +362,17 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { } // caller must have lock. -func (wkr *worker) shutdownIfBroken(dur time.Duration) { - if wkr.state == StateHold { - return +func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { + if wkr.idleBehavior == IdleBehaviorHold { + // Never shut down. + return false } label, threshold := "", wkr.wp.timeoutProbe - if wkr.state == StateBooting { + if wkr.state == StateUnknown || wkr.state == StateBooting { label, threshold = "new ", wkr.wp.timeoutBooting } if dur < threshold { - return + return false } wkr.logger.WithFields(logrus.Fields{ "Duration": dur, @@ -287,23 +380,60 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) { "State": wkr.state, }).Warnf("%sinstance unresponsive, shutting down", label) wkr.shutdown() + return true } +// Returns true if the instance is eligible for shutdown: either it's +// been idle too long, or idleBehavior=Drain and nothing is running. +// // caller must have lock. -func (wkr *worker) shutdownIfIdle() bool { - if wkr.state != StateIdle { +func (wkr *worker) eligibleForShutdown() bool { + if wkr.idleBehavior == IdleBehaviorHold { return false } - age := time.Since(wkr.busy) - if age < wkr.wp.timeoutIdle { + draining := wkr.idleBehavior == IdleBehaviorDrain + switch wkr.state { + case StateBooting: + return draining + case StateIdle: + return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle + case StateRunning: + if !draining { + return false + } + for _, rr := range wkr.running { + if !rr.givenup { + return false + } + } + for _, rr := range wkr.starting { + if !rr.givenup { + return false + } + } + // draining, and all remaining runners are just trying + // to force-kill their crunch-run procs + return true + default: + return false + } +} + +// caller must have lock. +func (wkr *worker) shutdownIfIdle() bool { + if !wkr.eligibleForShutdown() { return false } - wkr.logger.WithField("Age", age).Info("shutdown idle worker") + wkr.logger.WithFields(logrus.Fields{ + "State": wkr.state, + "IdleDuration": stats.Duration(time.Since(wkr.busy)), + "IdleBehavior": wkr.idleBehavior, + }).Info("shutdown worker") wkr.shutdown() return true } -// caller must have lock +// caller must have lock. func (wkr *worker) shutdown() { now := time.Now() wkr.updated = now @@ -318,3 +448,94 @@ func (wkr *worker) shutdown() { } }() } + +// Save worker tags to cloud provider metadata, if they don't already +// match. Caller must have lock. +func (wkr *worker) saveTags() { + instance := wkr.instance + tags := instance.Tags() + update := cloud.InstanceTags{ + wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name, + wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior), + } + save := false + for k, v := range update { + if tags[k] != v { + tags[k] = v + save = true + } + } + if save { + go func() { + err := instance.SetTags(tags) + if err != nil { + wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags") + } + }() + } +} + +func (wkr *worker) Close() { + // This might take time, so do it after unlocking mtx. + defer wkr.executor.Close() + + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + for uuid, rr := range wkr.running { + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") + rr.Close() + } + for uuid, rr := range wkr.starting { + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") + rr.Close() + } +} + +// Add/remove entries in wkr.running to match ctrUUIDs returned by a +// probe. Returns true if anything was added or removed. +// +// Caller must have lock. +func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) { + alive := map[string]bool{} + for _, uuid := range ctrUUIDs { + alive[uuid] = true + if _, ok := wkr.running[uuid]; ok { + // unchanged + } else if rr, ok := wkr.starting[uuid]; ok { + wkr.running[uuid] = rr + delete(wkr.starting, uuid) + changed = true + } else { + // We didn't start it -- it must have been + // started by a previous dispatcher process. + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected") + wkr.running[uuid] = newRemoteRunner(uuid, wkr) + changed = true + } + } + for uuid := range wkr.running { + if !alive[uuid] { + wkr.closeRunner(uuid) + changed = true + } + } + return +} + +// caller must have lock. +func (wkr *worker) closeRunner(uuid string) { + rr := wkr.running[uuid] + if rr == nil { + return + } + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") + delete(wkr.running, uuid) + rr.Close() + + now := time.Now() + wkr.updated = now + wkr.wp.exited[uuid] = now + if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { + wkr.state = StateIdle + } +}