// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
quotaErrorTTL = time.Minute
+
+ // Time between "X failed because rate limiting" messages
+ logRateLimitErrorInterval = time.Second * 10
)
func duration(conf arvados.Duration, def time.Duration) time.Duration {
wp := &Pool{
logger: logger,
arvClient: arvClient,
- instanceSet: instanceSet,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
- instanceSet cloud.InstanceSet
+ instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
imageID cloud.ImageID
mtx sync.RWMutex
setupOnce sync.Once
+ throttleCreate throttle
+ throttleInstances throttle
+
mInstances prometheus.Gauge
mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
mMemoryInuse prometheus.Gauge
}
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
//
// Example:
//
// ch := wp.Subscribe()
// defer wp.Unsubscribe(ch)
// for range ch {
-// // ...try scheduling some work...
+// tryScheduling(wp)
// if done {
// break
// }
// Create a new instance with the given type, and add it to the worker
// pool. The worker is added immediately; instance creation runs in
// the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) {
- return wp.atQuotaErr
+ if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ return false
}
tags := cloud.InstanceTags{
tagKeyInstanceType: it.Name,
break
}
}
- if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
- wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
- }
if err != nil {
+ if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ }
logger.WithError(err).Error("create failed")
+ wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
wp.updateWorker(inst, it, StateBooting)
}()
- return nil
+ return true
}
// AtQuota returns true if Create is not expected to work at the
}
// Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
func (wp *Pool) getInstancesAndSync() error {
wp.setupOnce.Do(wp.setup)
+ if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+ return err
+ }
wp.logger.Debug("getting instance list")
threshold := time.Now()
instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
if err != nil {
+ wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
}
wp.sync(threshold, instances)