14325: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / pool.go
index ea51d6c3e858752a8add9b2f0adda1057dcf3b55..e6b50629892e62c250ebcf05ba5bb226daec1ce1 100644 (file)
@@ -155,15 +155,21 @@ type Pool struct {
        mMemoryInuse       prometheus.Gauge
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
-//             // ...try scheduling some work...
+//             tryScheduling(wp)
 //             if done {
 //                     break
 //             }
@@ -198,7 +204,14 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                creating[it] = len(times)
        }
        for _, wkr := range wp.workers {
-               if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
+               // Skip workers that are not expected to become
+               // available soon. Note len(wkr.running)>0 is not
+               // redundant here: it can be true even in
+               // StateUnknown.
+               if wkr.state == StateShutdown ||
+                       wkr.state == StateRunning ||
+                       wkr.idleBehavior != IdleBehaviorRun ||
+                       len(wkr.running) > 0 {
                        continue
                }
                it := wkr.instType
@@ -264,6 +277,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
                                wp.atQuotaErr = err
                                wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                               time.AfterFunc(quotaErrorTTL, wp.notify)
                        }
                        logger.WithError(err).Error("create failed")
                        wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
@@ -397,6 +411,12 @@ func (wp *Pool) CountWorkers() map[State]int {
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()