X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c81a653c1e800c40a0c6e1a5d94cddd6620b5e52..a5fe34a8cd05c4e55deaec599347f65e6d662d22:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 364670544b..1e759e38f3 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -18,8 +18,13 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// A View shows a worker's current state and recent activity. -type View struct { +const ( + tagKeyInstanceType = "InstanceType" + tagKeyHold = "Hold" +) + +// An InstanceView shows a worker's current state and recent activity. +type InstanceView struct { Instance string Price float64 ArvadosInstanceType string @@ -86,7 +91,12 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), } wp.registerMetrics(reg) - go wp.run() + go func() { + wp.setupOnce.Do(wp.setup) + go wp.runMetrics() + go wp.runProbes() + go wp.runSync() + }() return wp } @@ -200,9 +210,10 @@ func (wp *Pool) Create(it arvados.InstanceType) error { wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() - tags := cloud.InstanceTags{"InstanceType": it.Name} + tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} wp.creating[it]++ go func() { + defer wp.notify() inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -212,7 +223,6 @@ func (wp *Pool) Create(it arvados.InstanceType) error { } if err != nil { logger.WithError(err).Error("create failed") - go wp.notify() return } wp.updateWorker(inst, it, StateBooting) @@ -223,12 +233,16 @@ func (wp *Pool) Create(it arvados.InstanceType) error { // AtQuota returns true if Create is not expected to work at the // moment. func (wp *Pool) AtQuota() bool { + wp.mtx.Lock() + defer wp.mtx.Unlock() return time.Now().Before(wp.atQuotaUntil) } // Add or update worker attached to the given instance. Use // initialState if a new worker is created. Caller must have lock. -func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) { +// +// Returns true when a new worker is created. +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool { id := inst.ID() if wp.workers[id] != nil { wp.workers[id].executor.SetTarget(inst) @@ -237,9 +251,9 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi if initialState == StateBooting && wp.workers[id].state == StateUnknown { wp.workers[id].state = StateBooting } - return + return false } - if initialState == StateUnknown && inst.Tags()["hold"] != "" { + if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { initialState = StateHold } wp.logger.WithFields(logrus.Fields{ @@ -247,20 +261,21 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi "Instance": inst, "State": initialState, }).Infof("instance appeared in cloud") + now := time.Now() wp.workers[id] = &worker{ executor: wp.newExecutor(inst), state: initialState, instance: inst, instType: it, - probed: time.Now(), - busy: time.Now(), - updated: time.Now(), - unallocated: time.Now(), + probed: now, + busy: now, + updated: now, + unallocated: now, running: make(map[string]struct{}), starting: make(map[string]struct{}), probing: make(chan struct{}, 1), } - go wp.notify() + return true } // Shutdown shuts down a worker with the given type, or returns false @@ -298,7 +313,7 @@ func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) { go func() { err := wkr.instance.Destroy() if err != nil { - logger.WithError(err).Warn("shutdown failed") + logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed") return } wp.mtx.Lock() @@ -369,9 +384,12 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) wp.mtx.Lock() defer wp.mtx.Unlock() - wkr.updated = time.Now() + now := time.Now() + wkr.updated = now + wkr.busy = now delete(wkr.starting, ctr.UUID) wkr.running[ctr.UUID] = struct{}{} + wkr.lastUUID = ctr.UUID if err != nil { logger.WithField("stdout", string(stdout)). WithField("stderr", string(stderr)). @@ -391,6 +409,9 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b // KillContainer kills the crunch-run process for the given container // UUID, if it's running on any worker. +// +// KillContainer returns immediately; the act of killing the container +// takes some time, and runs in the background. func (wp *Pool) KillContainer(uuid string) { wp.mtx.Lock() defer wp.mtx.Unlock() @@ -482,6 +503,14 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { reg.MustRegister(wp.mMemoryInuse) } +func (wp *Pool) runMetrics() { + ch := wp.Subscribe() + defer wp.Unsubscribe(ch) + for range ch { + wp.updateMetrics() + } +} + func (wp *Pool) updateMetrics() { wp.mtx.RLock() defer wp.mtx.RUnlock() @@ -505,67 +534,57 @@ func (wp *Pool) updateMetrics() { wp.mMemoryInuse.Set(float64(memInuse)) } -func (wp *Pool) run() { - wp.setupOnce.Do(wp.setup) +func (wp *Pool) runProbes() { + maxPPS := wp.maxProbesPerSecond + if maxPPS < 1 { + maxPPS = defaultMaxProbesPerSecond + } + limitticker := time.NewTicker(time.Second / time.Duration(maxPPS)) + defer limitticker.Stop() - go func() { - ch := wp.Subscribe() - defer wp.Unsubscribe(ch) - for range ch { - wp.updateMetrics() - } - }() + probeticker := time.NewTicker(wp.probeInterval) + defer probeticker.Stop() - go func() { - maxPPS := wp.maxProbesPerSecond - if maxPPS < 1 { - maxPPS = defaultMaxProbesPerSecond + workers := []cloud.InstanceID{} + for range probeticker.C { + workers = workers[:0] + wp.mtx.Lock() + for id, wkr := range wp.workers { + if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) { + continue + } + workers = append(workers, id) } - limitticker := time.NewTicker(time.Second / time.Duration(maxPPS)) - defer limitticker.Stop() - - probeticker := time.NewTicker(wp.probeInterval) - defer probeticker.Stop() + wp.mtx.Unlock() - workers := []cloud.InstanceID{} - for range probeticker.C { - workers = workers[:0] + for _, id := range workers { wp.mtx.Lock() - for id, wkr := range wp.workers { - if wkr.state == StateShutdown || wp.autoShutdown(wkr) { - continue - } - workers = append(workers, id) - } + wkr, ok := wp.workers[id] wp.mtx.Unlock() - - for _, id := range workers { - wp.mtx.Lock() - wkr, ok := wp.workers[id] - wp.mtx.Unlock() - if !ok || wkr.state == StateShutdown { - // Deleted/shutdown while we - // were probing others - continue - } - select { - case wkr.probing <- struct{}{}: - go func() { - wp.probeAndUpdate(wkr) - <-wkr.probing - }() - default: - wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish") - } - select { - case <-wp.stop: - return - case <-limitticker.C: - } + if !ok || wkr.state == StateShutdown { + // Deleted/shutdown while we + // were probing others + continue + } + select { + case wkr.probing <- struct{}{}: + go func() { + wp.probeAndUpdate(wkr) + <-wkr.probing + }() + default: + wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish") + } + select { + case <-wp.stop: + return + case <-limitticker.C: } } - }() + } +} +func (wp *Pool) runSync() { // sync once immediately, then wait syncInterval, sync again, // etc. timer := time.NewTimer(1) @@ -585,7 +604,28 @@ func (wp *Pool) run() { } // caller must have lock. -func (wp *Pool) autoShutdown(wkr *worker) bool { +func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) { + if wkr.state == StateHold { + return + } + label, threshold := "", wp.timeoutProbe + if wkr.state == StateBooting { + label, threshold = "new ", wp.timeoutBooting + } + if dur < threshold { + return + } + wp.logger.WithFields(logrus.Fields{ + "Instance": wkr.instance, + "Duration": dur, + "Since": wkr.probed, + "State": wkr.state, + }).Warnf("%sinstance unresponsive, shutting down", label) + wp.shutdown(wkr, wp.logger) +} + +// caller must have lock. +func (wp *Pool) shutdownIfIdle(wkr *worker) bool { if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning { return false } @@ -608,13 +648,14 @@ func (wp *Pool) Stop() { close(wp.stop) } -// View reports status information for every worker in the pool. -func (wp *Pool) View() []View { - var r []View +// Instances returns an InstanceView for each worker in the pool, +// summarizing its current state and recent activity. +func (wp *Pool) Instances() []InstanceView { + var r []InstanceView wp.setupOnce.Do(wp.setup) wp.mtx.Lock() for _, w := range wp.workers { - r = append(r, View{ + r = append(r, InstanceView{ Instance: w.instance.String(), Price: w.instType.Price, ArvadosInstanceType: w.instType.Name, @@ -669,15 +710,18 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { wp.mtx.Lock() defer wp.mtx.Unlock() wp.logger.WithField("Instances", len(instances)).Debug("sync instances") + notify := false for _, inst := range instances { - itTag := inst.Tags()["InstanceType"] + itTag := inst.Tags()[tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) continue } - wp.updateWorker(inst, it, StateUnknown) + if wp.updateWorker(inst, it, StateUnknown) { + notify = true + } } for id, wkr := range wp.workers { @@ -691,13 +735,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { logger.Info("instance disappeared in cloud") delete(wp.workers, id) go wkr.executor.Close() - go wp.notify() + notify = true } if !wp.loaded { wp.loaded = true wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") } + + if notify { + go wp.notify() + } } // should be called in a new goroutine @@ -744,28 +792,12 @@ func (wp *Pool) probeAndUpdate(wkr *worker) { } else { logger.Info("instance not responding") } - - if wkr.state == StateHold { - return - } - - label, threshold := "", wp.timeoutProbe - if wkr.state == StateBooting { - label, threshold = "new ", wp.timeoutBooting - } - if dur > threshold { - logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label) - wp.shutdown(wkr, logger) - } + wp.shutdownIfBroken(wkr, dur) return } updateTime := time.Now() wkr.probed = updateTime - if len(ctrUUIDs) > 0 { - wkr.busy = updateTime - wkr.lastUUID = ctrUUIDs[0] - } if wkr.state == StateShutdown || wkr.state == StateHold { } else if booted { if wkr.state != StateRunning { @@ -777,13 +809,18 @@ func (wp *Pool) probeAndUpdate(wkr *worker) { } if updated != wkr.updated { - // Worker was updated (e.g., by starting a new - // container) after the probe began. Avoid clobbering - // those changes with the probe results. + // Worker was updated after the probe began, so + // wkr.running might have a container UUID that was + // not yet running when ctrUUIDs was generated. Leave + // wkr.running alone and wait for the next probe to + // catch up on any changes. return } - if len(ctrUUIDs) == 0 && len(wkr.running) > 0 { + if len(ctrUUIDs) > 0 { + wkr.busy = updateTime + wkr.lastUUID = ctrUUIDs[0] + } else if len(wkr.running) > 0 { wkr.unallocated = updateTime } running := map[string]struct{}{}