14325: Document Running() return value.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 2c2d977d874227d8ebea67392b4665e03492a80e..1665a1e43def02e14c7cf915c7f5410131f6c4c8 100644 (file)
@@ -67,6 +67,9 @@ const (
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
        quotaErrorTTL = time.Minute
+
+       // Time between "X failed because rate limiting" messages
+       logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -85,7 +88,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
-               instanceSet:        instanceSet,
+               instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
                imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
@@ -115,7 +118,7 @@ type Pool struct {
        // configuration
        logger             logrus.FieldLogger
        arvClient          *arvados.Client
-       instanceSet        cloud.InstanceSet
+       instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
        imageID            cloud.ImageID
@@ -140,6 +143,9 @@ type Pool struct {
        mtx          sync.RWMutex
        setupOnce    sync.Once
 
+       throttleCreate    throttle
+       throttleInstances throttle
+
        mInstances         prometheus.Gauge
        mInstancesPrice    prometheus.Gauge
        mContainersRunning prometheus.Gauge
@@ -149,15 +155,21 @@ type Pool struct {
        mMemoryInuse       prometheus.Gauge
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
-//             // ...try scheduling some work...
+//             tryScheduling(wp)
 //             if done {
 //                     break
 //             }
@@ -222,13 +234,18 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) {
-               return wp.atQuotaErr
+       if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+               return false
        }
        tags := cloud.InstanceTags{
                tagKeyInstanceType: it.Name,
@@ -249,17 +266,19 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
                                break
                        }
                }
-               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-                       wp.atQuotaErr = err
-                       wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
-               }
                if err != nil {
+                       if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                               wp.atQuotaErr = err
+                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                       }
                        logger.WithError(err).Error("create failed")
+                       wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
                wp.updateWorker(inst, it, StateBooting)
        }()
-       return nil
+       return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
@@ -385,6 +404,12 @@ func (wp *Pool) CountWorkers() map[State]int {
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
@@ -677,10 +702,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
+       if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+               return err
+       }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
        instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
        if err != nil {
+               wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
        }
        wp.sync(threshold, instances)