X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c81a653c1e800c40a0c6e1a5d94cddd6620b5e52..5d05b79a21ee85fa6ec53f9bc88e6ddb54029f04:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 364670544b..a43b96ed82 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,7 +5,6 @@ package worker import ( - "bytes" "io" "sort" "strings" @@ -18,14 +17,19 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// A View shows a worker's current state and recent activity. -type View struct { +const ( + tagKeyInstanceType = "InstanceType" + tagKeyHold = "Hold" +) + +// An InstanceView shows a worker's current state and recent activity. +type InstanceView struct { Instance string Price float64 ArvadosInstanceType string ProviderInstanceType string LastContainerUUID string - Unallocated time.Time + LastBusy time.Time WorkerState string } @@ -56,6 +60,7 @@ const ( defaultTimeoutIdle = time.Minute defaultTimeoutBooting = time.Minute * 10 defaultTimeoutProbe = time.Minute * 10 + defaultTimeoutShutdown = time.Second * 10 ) func duration(conf arvados.Duration, def time.Duration) time.Duration { @@ -84,9 +89,15 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle), timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), + timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), } wp.registerMetrics(reg) - go wp.run() + go func() { + wp.setupOnce.Do(wp.setup) + go wp.runMetrics() + go wp.runProbes() + go wp.runSync() + }() return wp } @@ -106,6 +117,7 @@ type Pool struct { timeoutIdle time.Duration timeoutBooting time.Duration timeoutProbe time.Duration + timeoutShutdown time.Duration // private state subscribers map[<-chan struct{}]chan<- struct{} @@ -114,6 +126,7 @@ type Pool struct { loaded bool // loaded list of instances from InstanceSet at least once exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called atQuotaUntil time.Time + atQuotaErr cloud.QuotaError stop chan bool mtx sync.RWMutex setupOnce sync.Once @@ -126,24 +139,6 @@ type Pool struct { mMemoryInuse prometheus.Gauge } -type worker struct { - state State - instance cloud.Instance - executor Executor - instType arvados.InstanceType - vcpus int64 - memory int64 - booted bool - probed time.Time - updated time.Time - busy time.Time - unallocated time.Time - lastUUID string - running map[string]struct{} - starting map[string]struct{} - probing chan struct{} -} - // Subscribe returns a channel that becomes ready whenever a worker's // state changes. // @@ -176,6 +171,11 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) { // Unallocated returns the number of unallocated (creating + booting + // idle + unknown) workers for each instance type. +// +// The returned counts should be interpreted as upper bounds, rather +// than exact counts: they are sometimes artificially high when a +// newly created instance appears in the driver's Instances() list +// before the Create() call returns. func (wp *Pool) Unallocated() map[arvados.InstanceType]int { wp.setupOnce.Do(wp.setup) wp.mtx.RLock() @@ -185,7 +185,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { u[it] = c } for _, wkr := range wp.workers { - if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) { + if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown { u[wkr.instType]++ } } @@ -200,19 +200,23 @@ func (wp *Pool) Create(it arvados.InstanceType) error { wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() - tags := cloud.InstanceTags{"InstanceType": it.Name} + if time.Now().Before(wp.atQuotaUntil) { + return wp.atQuotaErr + } + tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} wp.creating[it]++ go func() { + defer wp.notify() inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil) wp.mtx.Lock() defer wp.mtx.Unlock() wp.creating[it]-- if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { + wp.atQuotaErr = err wp.atQuotaUntil = time.Now().Add(time.Minute) } if err != nil { logger.WithError(err).Error("create failed") - go wp.notify() return } wp.updateWorker(inst, it, StateBooting) @@ -223,44 +227,59 @@ func (wp *Pool) Create(it arvados.InstanceType) error { // AtQuota returns true if Create is not expected to work at the // moment. func (wp *Pool) AtQuota() bool { + wp.mtx.Lock() + defer wp.mtx.Unlock() return time.Now().Before(wp.atQuotaUntil) } // Add or update worker attached to the given instance. Use -// initialState if a new worker is created. Caller must have lock. -func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) { +// initialState if a new worker is created. +// +// The second return value is true if a new worker is created. +// +// Caller must have lock. +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) { id := inst.ID() - if wp.workers[id] != nil { - wp.workers[id].executor.SetTarget(inst) - wp.workers[id].instance = inst - wp.workers[id].updated = time.Now() - if initialState == StateBooting && wp.workers[id].state == StateUnknown { - wp.workers[id].state = StateBooting + if wkr := wp.workers[id]; wkr != nil { + wkr.executor.SetTarget(inst) + wkr.instance = inst + wkr.updated = time.Now() + if initialState == StateBooting && wkr.state == StateUnknown { + wkr.state = StateBooting } - return + return wkr, false } - if initialState == StateUnknown && inst.Tags()["hold"] != "" { + if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { initialState = StateHold } - wp.logger.WithFields(logrus.Fields{ + logger := wp.logger.WithFields(logrus.Fields{ "InstanceType": it.Name, "Instance": inst, - "State": initialState, - }).Infof("instance appeared in cloud") - wp.workers[id] = &worker{ - executor: wp.newExecutor(inst), - state: initialState, - instance: inst, - instType: it, - probed: time.Now(), - busy: time.Now(), - updated: time.Now(), - unallocated: time.Now(), - running: make(map[string]struct{}), - starting: make(map[string]struct{}), - probing: make(chan struct{}, 1), - } - go wp.notify() + }) + logger.WithField("State", initialState).Infof("instance appeared in cloud") + now := time.Now() + wkr := &worker{ + mtx: &wp.mtx, + wp: wp, + logger: logger, + executor: wp.newExecutor(inst), + state: initialState, + instance: inst, + instType: it, + probed: now, + busy: now, + updated: now, + running: make(map[string]struct{}), + starting: make(map[string]struct{}), + probing: make(chan struct{}, 1), + } + wp.workers[id] = wkr + return wkr, true +} + +// caller must have lock. +func (wp *Pool) notifyExited(uuid string, t time.Time) { + wp.exited[uuid] = t } // Shutdown shuts down a worker with the given type, or returns false @@ -271,45 +290,22 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool { defer wp.mtx.Unlock() logger := wp.logger.WithField("InstanceType", it.Name) logger.Info("shutdown requested") - for _, tryState := range []State{StateBooting, StateRunning} { + for _, tryState := range []State{StateBooting, StateIdle} { // TODO: shutdown the worker with the longest idle - // time (Running) or the earliest create time - // (Booting) + // time (Idle) or the earliest create time (Booting) for _, wkr := range wp.workers { - if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 { - continue - } - if wkr.instType != it { - continue + if wkr.state == tryState && wkr.instType == it { + logger.WithField("Instance", wkr.instance).Info("shutting down") + wkr.shutdown() + return true } - logger = logger.WithField("Instance", wkr.instance) - logger.Info("shutting down") - wp.shutdown(wkr, logger) - return true } } return false } -// caller must have lock -func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) { - wkr.updated = time.Now() - wkr.state = StateShutdown - go func() { - err := wkr.instance.Destroy() - if err != nil { - logger.WithError(err).Warn("shutdown failed") - return - } - wp.mtx.Lock() - wp.atQuotaUntil = time.Now() - wp.mtx.Unlock() - wp.notify() - }() -} - -// Workers returns the current number of workers in each state. -func (wp *Pool) Workers() map[State]int { +// CountWorkers returns the current number of workers in each state. +func (wp *Pool) CountWorkers() map[State]int { wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -343,17 +339,12 @@ func (wp *Pool) Running() map[string]time.Time { // StartContainer starts a container on an idle worker immediately if // possible, otherwise returns false. func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool { - logger := wp.logger.WithFields(logrus.Fields{ - "InstanceType": it.Name, - "ContainerUUID": ctr.UUID, - "Priority": ctr.Priority, - }) wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() var wkr *worker for _, w := range wp.workers { - if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 { + if w.instType == it && w.state == StateIdle { if wkr == nil || w.busy.After(wkr.busy) { wkr = w } @@ -362,35 +353,15 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b if wkr == nil { return false } - logger = logger.WithField("Instance", wkr.instance) - logger.Debug("starting container") - wkr.starting[ctr.UUID] = struct{}{} - go func() { - stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) - wp.mtx.Lock() - defer wp.mtx.Unlock() - wkr.updated = time.Now() - delete(wkr.starting, ctr.UUID) - wkr.running[ctr.UUID] = struct{}{} - if err != nil { - logger.WithField("stdout", string(stdout)). - WithField("stderr", string(stderr)). - WithError(err). - Error("error starting crunch-run process") - // Leave uuid in wkr.running, though: it's - // possible the error was just a communication - // failure and the process was in fact - // started. Wait for next probe to find out. - return - } - logger.Info("crunch-run process started") - wkr.lastUUID = ctr.UUID - }() + wkr.startContainer(ctr) return true } // KillContainer kills the crunch-run process for the given container // UUID, if it's running on any worker. +// +// KillContainer returns immediately; the act of killing the container +// takes some time, and runs in the background. func (wp *Pool) KillContainer(uuid string) { wp.mtx.Lock() defer wp.mtx.Unlock() @@ -414,7 +385,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) { "Instance": wkr.instance, }) logger.Debug("killing process") - stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil) + stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil) if err != nil { logger.WithFields(logrus.Fields{ "stderr": string(stderr), @@ -428,6 +399,9 @@ func (wp *Pool) kill(wkr *worker, uuid string) { defer wp.mtx.Unlock() if _, ok := wkr.running[uuid]; ok { delete(wkr.running, uuid) + if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { + wkr.state = StateIdle + } wkr.updated = time.Now() go wp.notify() } @@ -482,6 +456,14 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { reg.MustRegister(wp.mMemoryInuse) } +func (wp *Pool) runMetrics() { + ch := wp.Subscribe() + defer wp.Unsubscribe(ch) + for range ch { + wp.updateMetrics() + } +} + func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() @@ -505,67 +487,49 @@ func (wp *Pool) updateMetrics() { wp.mMemoryInuse.Set(float64(memInuse)) } -func (wp *Pool) run() { - wp.setupOnce.Do(wp.setup) +func (wp *Pool) runProbes() { + maxPPS := wp.maxProbesPerSecond + if maxPPS < 1 { + maxPPS = defaultMaxProbesPerSecond + } + limitticker := time.NewTicker(time.Second / time.Duration(maxPPS)) + defer limitticker.Stop() - go func() { - ch := wp.Subscribe() - defer wp.Unsubscribe(ch) - for range ch { - wp.updateMetrics() - } - }() + probeticker := time.NewTicker(wp.probeInterval) + defer probeticker.Stop() - go func() { - maxPPS := wp.maxProbesPerSecond - if maxPPS < 1 { - maxPPS = defaultMaxProbesPerSecond + workers := []cloud.InstanceID{} + for range probeticker.C { + workers = workers[:0] + wp.mtx.Lock() + for id, wkr := range wp.workers { + if wkr.state == StateShutdown || wkr.shutdownIfIdle() { + continue + } + workers = append(workers, id) } - limitticker := time.NewTicker(time.Second / time.Duration(maxPPS)) - defer limitticker.Stop() - - probeticker := time.NewTicker(wp.probeInterval) - defer probeticker.Stop() + wp.mtx.Unlock() - workers := []cloud.InstanceID{} - for range probeticker.C { - workers = workers[:0] + for _, id := range workers { wp.mtx.Lock() - for id, wkr := range wp.workers { - if wkr.state == StateShutdown || wp.autoShutdown(wkr) { - continue - } - workers = append(workers, id) - } + wkr, ok := wp.workers[id] wp.mtx.Unlock() - - for _, id := range workers { - wp.mtx.Lock() - wkr, ok := wp.workers[id] - wp.mtx.Unlock() - if !ok || wkr.state == StateShutdown { - // Deleted/shutdown while we - // were probing others - continue - } - select { - case wkr.probing <- struct{}{}: - go func() { - wp.probeAndUpdate(wkr) - <-wkr.probing - }() - default: - wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish") - } - select { - case <-wp.stop: - return - case <-limitticker.C: - } + if !ok { + // Deleted while we were probing + // others + continue + } + go wkr.ProbeAndUpdate() + select { + case <-wp.stop: + return + case <-limitticker.C: } } - }() + } +} +func (wp *Pool) runSync() { // sync once immediately, then wait syncInterval, sync again, // etc. timer := time.NewTimer(1) @@ -584,43 +548,26 @@ func (wp *Pool) run() { } } -// caller must have lock. -func (wp *Pool) autoShutdown(wkr *worker) bool { - if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning { - return false - } - age := time.Since(wkr.unallocated) - if age < wp.timeoutIdle { - return false - } - logger := wp.logger.WithFields(logrus.Fields{ - "Age": age, - "Instance": wkr.instance, - }) - logger.Info("shutdown idle worker") - wp.shutdown(wkr, logger) - return true -} - // Stop synchronizing with the InstanceSet. func (wp *Pool) Stop() { wp.setupOnce.Do(wp.setup) close(wp.stop) } -// View reports status information for every worker in the pool. -func (wp *Pool) View() []View { - var r []View +// Instances returns an InstanceView for each worker in the pool, +// summarizing its current state and recent activity. +func (wp *Pool) Instances() []InstanceView { + var r []InstanceView wp.setupOnce.Do(wp.setup) wp.mtx.Lock() for _, w := range wp.workers { - r = append(r, View{ + r = append(r, InstanceView{ Instance: w.instance.String(), Price: w.instType.Price, ArvadosInstanceType: w.instType.Name, ProviderInstanceType: w.instType.ProviderType, LastContainerUUID: w.lastUUID, - Unallocated: w.unallocated, + LastBusy: w.busy, WorkerState: w.state.String(), }) } @@ -669,15 +616,21 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { wp.mtx.Lock() defer wp.mtx.Unlock() wp.logger.WithField("Instances", len(instances)).Debug("sync instances") + notify := false for _, inst := range instances { - itTag := inst.Tags()["InstanceType"] + itTag := inst.Tags()[tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } - wp.updateWorker(inst, it, StateUnknown) + if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew { + notify = true + } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { + wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") + wkr.shutdown() + } } for id, wkr := range wp.workers { @@ -691,158 +644,15 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { logger.Info("instance disappeared in cloud") delete(wp.workers, id) go wkr.executor.Close() - go wp.notify() + notify = true } if !wp.loaded { wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } -} - -// should be called in a new goroutine -func (wp *Pool) probeAndUpdate(wkr *worker) { - logger := wp.logger.WithField("Instance", wkr.instance) - wp.mtx.Lock() - updated := wkr.updated - booted := wkr.booted - wp.mtx.Unlock() - - var ( - ctrUUIDs []string - ok bool - stderr []byte - ) - if !booted { - booted, stderr = wp.probeBooted(wkr) - wp.mtx.Lock() - if booted && !wkr.booted { - wkr.booted = booted - logger.Info("instance booted") - } else { - booted = wkr.booted - } - wp.mtx.Unlock() - } - if booted { - ctrUUIDs, ok, stderr = wp.probeRunning(wkr) - } - logger = logger.WithField("stderr", string(stderr)) - wp.mtx.Lock() - defer wp.mtx.Unlock() - if !ok { - if wkr.state == StateShutdown { - return - } - dur := time.Since(wkr.probed) - logger := logger.WithFields(logrus.Fields{ - "Duration": dur, - "State": wkr.state, - }) - if wkr.state == StateBooting { - logger.Debug("new instance not responding") - } else { - logger.Info("instance not responding") - } - if wkr.state == StateHold { - return - } - - label, threshold := "", wp.timeoutProbe - if wkr.state == StateBooting { - label, threshold = "new ", wp.timeoutBooting - } - if dur > threshold { - logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label) - wp.shutdown(wkr, logger) - } - return - } - - updateTime := time.Now() - wkr.probed = updateTime - if len(ctrUUIDs) > 0 { - wkr.busy = updateTime - wkr.lastUUID = ctrUUIDs[0] - } - if wkr.state == StateShutdown || wkr.state == StateHold { - } else if booted { - if wkr.state != StateRunning { - wkr.state = StateRunning - go wp.notify() - } - } else { - wkr.state = StateBooting - } - - if updated != wkr.updated { - // Worker was updated (e.g., by starting a new - // container) after the probe began. Avoid clobbering - // those changes with the probe results. - return - } - - if len(ctrUUIDs) == 0 && len(wkr.running) > 0 { - wkr.unallocated = updateTime - } - running := map[string]struct{}{} - changed := false - for _, uuid := range ctrUUIDs { - running[uuid] = struct{}{} - if _, ok := wkr.running[uuid]; !ok { - changed = true - } - } - for uuid := range wkr.running { - if _, ok := running[uuid]; !ok { - logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") - wp.exited[uuid] = updateTime - changed = true - } - } - if changed { - wkr.running = running - wkr.updated = updateTime + if notify { go wp.notify() } } - -func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) { - cmd := "crunch-run --list" - stdout, stderr, err := wkr.executor.Execute(cmd, nil) - if err != nil { - wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Command": cmd, - "stdout": string(stdout), - "stderr": string(stderr), - }).WithError(err).Warn("probe failed") - return nil, false, stderr - } - stdout = bytes.TrimRight(stdout, "\n") - if len(stdout) == 0 { - return nil, true, stderr - } - return strings.Split(string(stdout), "\n"), true, stderr -} - -func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) { - cmd := wp.bootProbeCommand - if cmd == "" { - cmd = "true" - } - stdout, stderr, err := wkr.executor.Execute(cmd, nil) - logger := wp.logger.WithFields(logrus.Fields{ - "Instance": wkr.instance, - "Command": cmd, - "stdout": string(stdout), - "stderr": string(stderr), - }) - if err != nil { - logger.WithError(err).Debug("boot probe failed") - return false, stderr - } - logger.Info("boot probe succeeded") - return true, stderr -}