16739: implement review feedback.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index ec6a049e640e7e1d619251e21af07f418aa28713..435b6e43ae4a3e150f680883aa7e124daf6e4230 100644 (file)
@@ -96,28 +96,28 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 // cluster configuration.
 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
-               logger:                     logger,
-               arvClient:                  arvClient,
-               instanceSetID:              instanceSetID,
-               instanceSet:                &throttledInstanceSet{InstanceSet: instanceSet},
-               newExecutor:                newExecutor,
-               bootProbeCommand:           cluster.Containers.CloudVMs.BootProbeCommand,
-               runnerSource:               cluster.Containers.CloudVMs.DeployRunnerBinary,
-               imageID:                    cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
-               instanceTypes:              cluster.InstanceTypes,
-               maxProbesPerSecond:         cluster.Containers.CloudVMs.MaxProbesPerSecond,
-               maxConcurrentNodeCreateOps: cluster.Containers.CloudVMs.MaxConcurrentNodeCreateOps,
-               probeInterval:              duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
-               syncInterval:               duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
-               timeoutIdle:                duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
-               timeoutBooting:             duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
-               timeoutProbe:               duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
-               timeoutShutdown:            duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
-               timeoutTERM:                duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
-               timeoutSignal:              duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
-               installPublicKey:           installPublicKey,
-               tagKeyPrefix:               cluster.Containers.CloudVMs.TagKeyPrefix,
-               stop:                       make(chan bool),
+               logger:                         logger,
+               arvClient:                      arvClient,
+               instanceSetID:                  instanceSetID,
+               instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+               newExecutor:                    newExecutor,
+               bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
+               runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
+               imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
+               instanceTypes:                  cluster.InstanceTypes,
+               maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
+               maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
+               probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+               syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+               timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+               installPublicKey:               installPublicKey,
+               tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
+               stop:                           make(chan bool),
        }
        wp.registerMetrics(reg)
        go func() {
@@ -133,27 +133,27 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
 // zero Pool should not be used. Call NewPool to create a new Pool.
 type Pool struct {
        // configuration
-       logger                     logrus.FieldLogger
-       arvClient                  *arvados.Client
-       instanceSetID              cloud.InstanceSetID
-       instanceSet                *throttledInstanceSet
-       newExecutor                func(cloud.Instance) Executor
-       bootProbeCommand           string
-       runnerSource               string
-       imageID                    cloud.ImageID
-       instanceTypes              map[string]arvados.InstanceType
-       syncInterval               time.Duration
-       probeInterval              time.Duration
-       maxProbesPerSecond         int
-       maxConcurrentNodeCreateOps int
-       timeoutIdle                time.Duration
-       timeoutBooting             time.Duration
-       timeoutProbe               time.Duration
-       timeoutShutdown            time.Duration
-       timeoutTERM                time.Duration
-       timeoutSignal              time.Duration
-       installPublicKey           ssh.PublicKey
-       tagKeyPrefix               string
+       logger                         logrus.FieldLogger
+       arvClient                      *arvados.Client
+       instanceSetID                  cloud.InstanceSetID
+       instanceSet                    *throttledInstanceSet
+       newExecutor                    func(cloud.Instance) Executor
+       bootProbeCommand               string
+       runnerSource                   string
+       imageID                        cloud.ImageID
+       instanceTypes                  map[string]arvados.InstanceType
+       syncInterval                   time.Duration
+       probeInterval                  time.Duration
+       maxProbesPerSecond             int
+       maxConcurrentInstanceCreateOps int
+       timeoutIdle                    time.Duration
+       timeoutBooting                 time.Duration
+       timeoutProbe                   time.Duration
+       timeoutShutdown                time.Duration
+       timeoutTERM                    time.Duration
+       timeoutSignal                  time.Duration
+       installPublicKey               ssh.PublicKey
+       tagKeyPrefix                   string
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -280,13 +280,6 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        return unalloc
 }
 
-type RateLimitError struct{ Retry time.Time }
-
-func (e RateLimitError) Error() string {
-       return fmt.Sprintf("node creation request failed, hit maxConcurrentNodeCreateOps, wait until %s", e.Retry)
-}
-func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
-
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
@@ -307,15 +300,16 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
                return false
        }
-       // The maxConcurrentNodeCreateOps knob throttles the number of node create
+       // The maxConcurrentInstanceCreateOps knob throttles the number of node create
        // requests in flight. It was added to work around a limitation in Azure's
        // managed disks, which support no more than 20 concurrent node creation
        // requests from a single disk image (cf.
        // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
        // The code assumes that node creation, from Azure's perspective, means the
        // period until the instance appears in the "get all instances" list.
-       if wp.maxConcurrentNodeCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentNodeCreateOps {
-               wp.instanceSet.throttleCreate.CheckRateLimitError(RateLimitError{Retry: time.Now().Add(5 * time.Second)}, wp.logger, "create instance", wp.notify)
+       if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
+               logger.Info("reached MaxConcurrentInstanceCreateOps")
+               wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
                return false
        }
        now := time.Now()