X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/262d59632f85b34ef4e2bcb1ee323a6e3b4435ed..be8ed479042df4fdefe1fd18c1e2e984e1c99bc0:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 2c2d977d87..1665a1e43d 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -67,6 +67,9 @@ const ( // Time after a quota error to try again anyway, even if no // instances have been shutdown. quotaErrorTTL = time.Minute + + // Time between "X failed because rate limiting" messages + logRateLimitErrorInterval = time.Second * 10 ) func duration(conf arvados.Duration, def time.Duration) time.Duration { @@ -85,7 +88,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe wp := &Pool{ logger: logger, arvClient: arvClient, - instanceSet: instanceSet, + instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, bootProbeCommand: cluster.CloudVMs.BootProbeCommand, imageID: cloud.ImageID(cluster.CloudVMs.ImageID), @@ -115,7 +118,7 @@ type Pool struct { // configuration logger logrus.FieldLogger arvClient *arvados.Client - instanceSet cloud.InstanceSet + instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string imageID cloud.ImageID @@ -140,6 +143,9 @@ type Pool struct { mtx sync.RWMutex setupOnce sync.Once + throttleCreate throttle + throttleInstances throttle + mInstances prometheus.Gauge mInstancesPrice prometheus.Gauge mContainersRunning prometheus.Gauge @@ -149,15 +155,21 @@ type Pool struct { mMemoryInuse prometheus.Gauge } -// Subscribe returns a channel that becomes ready whenever a worker's -// state changes. +// Subscribe returns a buffered channel that becomes ready after any +// change to the pool's state that could have scheduling implications: +// a worker's state changes, a new worker appears, the cloud +// provider's API rate limiting period ends, etc. +// +// Additional events that occur while the channel is already ready +// will be dropped, so it is OK if the caller services the channel +// slowly. // // Example: // // ch := wp.Subscribe() // defer wp.Unsubscribe(ch) // for range ch { -// // ...try scheduling some work... +// tryScheduling(wp) // if done { // break // } @@ -222,13 +234,18 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int { // Create a new instance with the given type, and add it to the worker // pool. The worker is added immediately; instance creation runs in // the background. -func (wp *Pool) Create(it arvados.InstanceType) error { +// +// Create returns false if a pre-existing error state prevents it from +// even attempting to create a new instance. Those errors are logged +// by the Pool, so the caller does not need to log anything in such +// cases. +func (wp *Pool) Create(it arvados.InstanceType) bool { logger := wp.logger.WithField("InstanceType", it.Name) wp.setupOnce.Do(wp.setup) wp.mtx.Lock() defer wp.mtx.Unlock() - if time.Now().Before(wp.atQuotaUntil) { - return wp.atQuotaErr + if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil { + return false } tags := cloud.InstanceTags{ tagKeyInstanceType: it.Name, @@ -249,17 +266,19 @@ func (wp *Pool) Create(it arvados.InstanceType) error { break } } - if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { - wp.atQuotaErr = err - wp.atQuotaUntil = time.Now().Add(quotaErrorTTL) - } if err != nil { + if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { + wp.atQuotaErr = err + wp.atQuotaUntil = time.Now().Add(quotaErrorTTL) + time.AfterFunc(quotaErrorTTL, wp.notify) + } logger.WithError(err).Error("create failed") + wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify) return } wp.updateWorker(inst, it, StateBooting) }() - return nil + return true } // AtQuota returns true if Create is not expected to work at the @@ -385,6 +404,12 @@ func (wp *Pool) CountWorkers() map[State]int { } // Running returns the container UUIDs being prepared/run on workers. +// +// In the returned map, the time value indicates when the Pool +// observed that the container process had exited. A container that +// has not yet exited has a zero time value. The caller should use +// KillContainer() to garbage-collect the entries for exited +// containers. func (wp *Pool) Running() map[string]time.Time { wp.setupOnce.Do(wp.setup) wp.mtx.Lock() @@ -677,10 +702,14 @@ func (wp *Pool) notify() { func (wp *Pool) getInstancesAndSync() error { wp.setupOnce.Do(wp.setup) + if err := wp.instanceSet.throttleInstances.Error(); err != nil { + return err + } wp.logger.Debug("getting instance list") threshold := time.Now() instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) if err != nil { + wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify) return err } wp.sync(threshold, instances)