X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/93671366e7633cbf0ca3cab68395e211e3afc31c..15b7d1d4fa484641d846c1750487e640bfe4be09:/lib/dispatchcloud/worker/pool.go?ds=sidebyside diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 45e9da37ad..be66895a98 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,7 +5,6 @@ package worker import ( - "bytes" "io" "sort" "strings" @@ -137,22 +136,6 @@ type Pool struct { mMemoryInuse prometheus.Gauge } -type worker struct { - state State - instance cloud.Instance - executor Executor - instType arvados.InstanceType - vcpus int64 - memory int64 - probed time.Time - updated time.Time - busy time.Time - lastUUID string - running map[string]struct{} // remember to update state idle<->running when this changes - starting map[string]struct{} // remember to update state idle<->running when this changes - probing chan struct{} -} - // Subscribe returns a channel that becomes ready whenever a worker's // state changes. // @@ -264,13 +247,16 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { initialState = StateHold } - wp.logger.WithFields(logrus.Fields{ + logger := wp.logger.WithFields(logrus.Fields{ "InstanceType": it.Name, "Instance": inst, - "State": initialState, - }).Infof("instance appeared in cloud") + }) + logger.WithField("State", initialState).Infof("instance appeared in cloud") now := time.Now() wp.workers[id] = &worker{ + mtx: &wp.mtx, + wp: wp, + logger: logger, executor: wp.newExecutor(inst), state: initialState, instance: inst, @@ -285,6 +271,11 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi return true } +// caller must have lock. +func (wp *Pool) notifyExited(uuid string, t time.Time) { + wp.exited[uuid] = t +} + // Shutdown shuts down a worker with the given type, or returns false // if all workers with the given type are busy. func (wp *Pool) Shutdown(it arvados.InstanceType) bool { @@ -298,9 +289,8 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { // time (Idle) or the earliest create time (Booting) for _, wkr := range wp.workers { if wkr.state == tryState && wkr.instType == it { - logger = logger.WithField("Instance", wkr.instance) - logger.Info("shutting down") - wp.shutdown(wkr, logger) + logger.WithField("Instance", wkr.instance).Info("shutting down") + wkr.shutdown() return true } } @@ -308,23 +298,6 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { return false } -// caller must have lock -func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) { - wkr.updated = time.Now() - wkr.state = StateShutdown - go func() { - err := wkr.instance.Destroy() - if err != nil { - logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed") - return - } - wp.mtx.Lock() - wp.atQuotaUntil = time.Now() - wp.mtx.Unlock() - wp.notify() - }() -} - // Workers returns the current number of workers in each state. func (wp *Pool) Workers() map[State]int { wp.setupOnce.Do(wp.setup) @@ -360,11 +333,6 @@ func (wp *Pool) Running() map[string]time.Time { // StartContainer starts a container on an idle worker immediately if // possible, otherwise returns false. func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool { - logger := wp.logger.WithFields(logrus.Fields{ - "InstanceType": it.Name, - "ContainerUUID": ctr.UUID, - "Priority": ctr.Priority, - }) wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -379,34 +347,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b if wkr == nil { return false } - logger = logger.WithField("Instance", wkr.instance) - logger.Debug("starting container") - wkr.starting[ctr.UUID] = struct{}{} - wkr.state = StateRunning - go func() { - stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) - wp.mtx.Lock() - defer wp.mtx.Unlock() - now := time.Now() - wkr.updated = now - wkr.busy = now - delete(wkr.starting, ctr.UUID) - wkr.running[ctr.UUID] = struct{}{} - wkr.lastUUID = ctr.UUID - if err != nil { - logger.WithField("stdout", string(stdout)). - WithField("stderr", string(stderr)). - WithError(err). - Error("error starting crunch-run process") - // Leave uuid in wkr.running, though: it's - // possible the error was just a communication - // failure and the process was in fact - // started. Wait for next probe to find out. - return - } - logger.Info("crunch-run process started") - wkr.lastUUID = ctr.UUID - }() + wkr.startContainer(ctr) return true } @@ -556,7 +497,7 @@ func (wp *Pool) runProbes() { workers = workers[:0] wp.mtx.Lock() for id, wkr := range wp.workers { - if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) { + if wkr.state == StateShutdown || wkr.shutdownIfIdle() { continue } workers = append(workers, id) @@ -567,20 +508,12 @@ func (wp *Pool) runProbes() { wp.mtx.Lock() wkr, ok := wp.workers[id] wp.mtx.Unlock() - if !ok || wkr.state == StateShutdown { - // Deleted/shutdown while we - // were probing others + if !ok { + // Deleted while we were probing + // others continue } - select { - case wkr.probing <- struct{}{}: - go func() { - wp.probeAndUpdate(wkr) - <-wkr.probing - }() - default: - wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish") - } + go wkr.ProbeAndUpdate() select { case <-wp.stop: return @@ -609,45 +542,6 @@ func (wp *Pool) runSync() { } } -// caller must have lock. -func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) { - if wkr.state == StateHold { - return - } - label, threshold := "", wp.timeoutProbe - if wkr.state == StateBooting { - label, threshold = "new ", wp.timeoutBooting - } - if dur < threshold { - return - } - wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Duration": dur, - "Since": wkr.probed, - "State": wkr.state, - }).Warnf("%sinstance unresponsive, shutting down", label) - wp.shutdown(wkr, wp.logger) -} - -// caller must have lock. -func (wp *Pool) shutdownIfIdle(wkr *worker) bool { - if wkr.state != StateIdle { - return false - } - age := time.Since(wkr.busy) - if age < wp.timeoutIdle { - return false - } - logger := wp.logger.WithFields(logrus.Fields{ - "Age": age, - "Instance": wkr.instance, - }) - logger.Info("shutdown idle worker") - wp.shutdown(wkr, logger) - return true -} - // Stop synchronizing with the InstanceSet. func (wp *Pool) Stop() { wp.setupOnce.Do(wp.setup) @@ -753,146 +647,3 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { go wp.notify() } } - -// should be called in a new goroutine -func (wp *Pool) probeAndUpdate(wkr *worker) { - logger := wp.logger.WithField("Instance", wkr.instance) - wp.mtx.Lock() - updated := wkr.updated - needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle - needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting - wp.mtx.Unlock() - if !needProbeBooted && !needProbeRunning { - return - } - - var ( - ctrUUIDs []string - ok bool - stderr []byte - ) - if needProbeBooted { - ok, stderr = wp.probeBooted(wkr) - wp.mtx.Lock() - if ok || wkr.state == StateRunning || wkr.state == StateIdle { - logger.Info("instance booted; will try probeRunning") - needProbeRunning = true - } - wp.mtx.Unlock() - } - if needProbeRunning { - ctrUUIDs, ok, stderr = wp.probeRunning(wkr) - } - logger = logger.WithField("stderr", string(stderr)) - wp.mtx.Lock() - defer wp.mtx.Unlock() - if !ok { - if wkr.state == StateShutdown && wkr.updated.After(updated) { - // Skip the logging noise if shutdown was - // initiated during probe. - return - } - dur := time.Since(wkr.probed) - logger := logger.WithFields(logrus.Fields{ - "Duration": dur, - "State": wkr.state, - }) - if wkr.state == StateBooting { - logger.Debug("new instance not responding") - } else { - logger.Info("instance not responding") - } - wp.shutdownIfBroken(wkr, dur) - return - } - - updateTime := time.Now() - wkr.probed = updateTime - - if updated != wkr.updated { - // Worker was updated after the probe began, so - // wkr.running might have a container UUID that was - // not yet running when ctrUUIDs was generated. Leave - // wkr.running alone and wait for the next probe to - // catch up on any changes. - return - } - - if len(ctrUUIDs) > 0 { - wkr.busy = updateTime - wkr.lastUUID = ctrUUIDs[0] - } else if len(wkr.running) > 0 { - // Actual last-busy time was sometime between wkr.busy - // and now. Now is the earliest opportunity to take - // advantage of the non-busy state, though. - wkr.busy = updateTime - } - running := map[string]struct{}{} - changed := false - for _, uuid := range ctrUUIDs { - running[uuid] = struct{}{} - if _, ok := wkr.running[uuid]; !ok { - changed = true - } - } - for uuid := range wkr.running { - if _, ok := running[uuid]; !ok { - logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") - wp.exited[uuid] = updateTime - changed = true - } - } - if wkr.state == StateUnknown || wkr.state == StateBooting { - wkr.state = StateIdle - changed = true - } - if changed { - wkr.running = running - if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { - wkr.state = StateRunning - } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { - wkr.state = StateIdle - } - wkr.updated = updateTime - go wp.notify() - } -} - -func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) { - cmd := "crunch-run --list" - stdout, stderr, err := wkr.executor.Execute(cmd, nil) - if err != nil { - wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Command": cmd, - "stdout": string(stdout), - "stderr": string(stderr), - }).WithError(err).Warn("probe failed") - return nil, false, stderr - } - stdout = bytes.TrimRight(stdout, "\n") - if len(stdout) == 0 { - return nil, true, stderr - } - return strings.Split(string(stdout), "\n"), true, stderr -} - -func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) { - cmd := wp.bootProbeCommand - if cmd == "" { - cmd = "true" - } - stdout, stderr, err := wkr.executor.Execute(cmd, nil) - logger := wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Command": cmd, - "stdout": string(stdout), - "stderr": string(stderr), - }) - if err != nil { - logger.WithError(err).Debug("boot probe failed") - return false, stderr - } - logger.Info("boot probe succeeded") - return true, stderr -}