X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/de9d45e3a238df8e9f0b2833b86c5e54fec37c7a..71fd4da18b22100682ae7e2079aadfd66360d310:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index ea51d6c3e8..e6b5062989 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -155,15 +155,21 @@ type Pool struct { mMemoryInuse prometheus.Gauge } -// Subscribe returns a channel that becomes ready whenever a worker's -// state changes. +// Subscribe returns a buffered channel that becomes ready after any +// change to the pool's state that could have scheduling implications: +// a worker's state changes, a new worker appears, the cloud +// provider's API rate limiting period ends, etc. +// +// Additional events that occur while the channel is already ready +// will be dropped, so it is OK if the caller services the channel +// slowly. // // Example: // // ch := wp.Subscribe() // defer wp.Unsubscribe(ch) // for range ch { -// // ...try scheduling some work... +// tryScheduling(wp) // if done { // break // } @@ -198,7 +204,14 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { creating[it] = len(times) } for _, wkr := range wp.workers { - if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun { + // Skip workers that are not expected to become + // available soon. Note len(wkr.running)>0 is not + // redundant here: it can be true even in + // StateUnknown. + if wkr.state == StateShutdown || + wkr.state == StateRunning || + wkr.idleBehavior != IdleBehaviorRun || + len(wkr.running) > 0 { continue } it := wkr.instType @@ -264,6 +277,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { wp.atQuotaErr = err wp.atQuotaUntil = time.Now().Add(quotaErrorTTL) + time.AfterFunc(quotaErrorTTL, wp.notify) } logger.WithError(err).Error("create failed") wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) @@ -397,6 +411,12 @@ func (wp *Pool) CountWorkers() map[State]int { } // Running returns the container UUIDs being prepared/run on workers. +// +// In the returned map, the time value indicates when the Pool +// observed that the container process had exited. A container that +// has not yet exited has a zero time value. The caller should use +// KillContainer() to garbage-collect the entries for exited +// containers. func (wp *Pool) Running() map[string]time.Time { wp.setupOnce.Do(wp.setup) wp.mtx.Lock()