20457: When passing prior quota level, raise maxConcurrency slowly.
authorTom Clegg <tom@curii.com>
Mon, 7 Aug 2023 19:26:20 +0000 (15:26 -0400)
committerTom Clegg <tom@curii.com>
Wed, 9 Aug 2023 15:47:49 +0000 (11:47 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/scheduler.go
lib/dispatchcloud/worker/pool.go

index 0c4634d7554a5487a55a0f9c53200e9beb01fabb..9817101bbb8a3b4461782565b1493c28de6aed0f 100644 (file)
@@ -92,7 +92,13 @@ func (sch *Scheduler) runQueue() {
        if sch.maxInstances > 0 && sch.maxConcurrency > sch.maxInstances {
                sch.maxConcurrency = sch.maxInstances
        }
-       if sch.pool.AtQuota() && len(running) > 0 && (sch.maxConcurrency == 0 || sch.maxConcurrency > len(running)) {
+       instances := len(running) + len(unalloc)
+       if sch.instancesWithinQuota > 0 && sch.instancesWithinQuota < instances {
+               // Evidently it is possible to run this many
+               // instances, so raise our estimate.
+               sch.instancesWithinQuota = instances
+       }
+       if sch.pool.AtQuota() {
                // Consider current workload to be the maximum
                // allowed, for the sake of reporting metrics and
                // calculating max supervisors.
@@ -103,7 +109,27 @@ func (sch *Scheduler) runQueue() {
                // supervisors when we reach the cloud-imposed quota
                // (which may be based on # CPUs etc) long before the
                // configured MaxInstances.
-               sch.maxConcurrency = len(running)
+               if sch.maxConcurrency == 0 || sch.maxConcurrency > instances {
+                       if instances == 0 {
+                               sch.maxConcurrency = 1
+                       } else {
+                               sch.maxConcurrency = instances
+                       }
+               }
+               sch.instancesWithinQuota = instances
+       } else if sch.instancesWithinQuota > 0 && sch.maxConcurrency > sch.instancesWithinQuota+1 {
+               // Once we've hit a quota error and started tracking
+               // instancesWithinQuota (i.e., it's not zero), we
+               // avoid exceeding that known-working level by more
+               // than 1.
+               //
+               // If we don't do this, we risk entering a pattern of
+               // repeatedly locking several containers, hitting
+               // quota again, and unlocking them again each time the
+               // driver stops reporting AtQuota, which tends to use
+               // up the max lock/unlock cycles on the next few
+               // containers in the queue, and cause them to fail.
+               sch.maxConcurrency = sch.instancesWithinQuota + 1
        }
        sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
 
index 1db12279dfe55fb67c237ea11052b864d0e69d45..ee7ab508839622af6f108cf6b98e6b48f6def603 100644 (file)
@@ -46,10 +46,11 @@ type Scheduler struct {
        stop    chan struct{}
        stopped chan struct{}
 
-       last503time        time.Time // last time API responded 503
-       maxConcurrency     int       // dynamic container limit (0 = unlimited), see runQueue()
-       supervisorFraction float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
-       maxInstances       int       // maximum number of instances the pool will bring up (0 = unlimited)
+       last503time          time.Time // last time API responded 503
+       maxConcurrency       int       // dynamic container limit (0 = unlimited), see runQueue()
+       supervisorFraction   float64   // maximum fraction of "supervisor" containers (these are containers who's main job is to launch other containers, e.g. workflow runners)
+       maxInstances         int       // maximum number of instances the pool will bring up (0 = unlimited)
+       instancesWithinQuota int       // max concurrency achieved since last quota error (0 = no quota error yet)
 
        mContainersAllocatedNotStarted   prometheus.Gauge
        mContainersNotAllocatedOverQuota prometheus.Gauge
index 1d600e37020ec0f26108e1997adb79c1f787839c..f79bad98fc16397e85f1469ef8d8013214aad8f9 100644 (file)
@@ -173,19 +173,20 @@ type Pool struct {
        runnerArgs                     []string // extra args passed to crunch-run
 
        // private state
-       subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
-       workers      map[cloud.InstanceID]*worker
-       loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
-       atQuotaUntil time.Time
-       atQuotaErr   cloud.QuotaError
-       stop         chan bool
-       mtx          sync.RWMutex
-       setupOnce    sync.Once
-       runnerData   []byte
-       runnerMD5    [md5.Size]byte
-       runnerCmd    string
+       subscribers                map[<-chan struct{}]chan<- struct{}
+       creating                   map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
+       workers                    map[cloud.InstanceID]*worker
+       loaded                     bool                 // loaded list of instances from InstanceSet at least once
+       exited                     map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
+       atQuotaUntilFewerInstances int
+       atQuotaUntil               time.Time
+       atQuotaErr                 cloud.QuotaError
+       stop                       chan bool
+       mtx                        sync.RWMutex
+       setupOnce                  sync.Once
+       runnerData                 []byte
+       runnerMD5                  [md5.Size]byte
+       runnerCmd                  string
 
        mContainersRunning        prometheus.Gauge
        mInstances                *prometheus.GaugeVec
@@ -322,6 +323,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if time.Now().Before(wp.atQuotaUntil) ||
+               wp.atQuotaUntilFewerInstances > 0 ||
                wp.instanceSet.throttleCreate.Error() != nil ||
                (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
                return false
@@ -360,8 +362,24 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                if err != nil {
                        if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
                                wp.atQuotaErr = err
-                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
-                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                               n := len(wp.workers) + len(wp.creating) - 1
+                               if n < 1 {
+                                       // Quota error with no
+                                       // instances running --
+                                       // nothing to do but wait
+                                       wp.atQuotaUntilFewerInstances = 0
+                                       wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                                       time.AfterFunc(quotaErrorTTL, wp.notify)
+                                       logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
+                               } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
+                                       // Quota error with N
+                                       // instances running -- report
+                                       // AtQuota until some
+                                       // instances shut down
+                                       wp.atQuotaUntilFewerInstances = n
+                                       wp.atQuotaUntil = time.Time{}
+                                       logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
+                               }
                        }
                        logger.WithError(err).Error("create failed")
                        wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
@@ -381,7 +399,9 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 func (wp *Pool) AtQuota() bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
+       return wp.atQuotaUntilFewerInstances > 0 ||
+               time.Now().Before(wp.atQuotaUntil) ||
+               (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
 }
 
 // SetIdleBehavior determines how the indicated instance will behave
@@ -1032,6 +1052,10 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                notify = true
        }
 
+       if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
+               wp.atQuotaUntilFewerInstances = 0
+       }
+
        if !wp.loaded {
                notify = true
                wp.loaded = true