20984: Handle "instance type not available" condition better.
authorTom Clegg <tom@curii.com>
Mon, 16 Oct 2023 19:04:36 +0000 (15:04 -0400)
committerTom Clegg <tom@curii.com>
Mon, 16 Oct 2023 19:07:27 +0000 (15:07 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/cloud/ec2/ec2.go
lib/cloud/ec2/ec2_test.go
lib/cloud/interfaces.go
lib/dispatchcloud/scheduler/interfaces.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/worker/pool.go

index 5e4df05f46f01c9c2305d968771282c3f34e2013..816df48d90c7dced2bacc1c864ea71da8dccd3d2 100644 (file)
@@ -665,21 +665,36 @@ func (err rateLimitError) EarliestRetry() time.Time {
        return err.earliestRetry
 }
 
-var isCodeCapacity = map[string]bool{
+type capacityError struct {
+       error
+       isInstanceTypeSpecific bool
+}
+
+func (er *capacityError) IsCapacityError() bool {
+       return true
+}
+
+func (er *capacityError) IsInstanceTypeSpecific() bool {
+       return er.isInstanceTypeSpecific
+}
+
+var isCodeQuota = map[string]bool{
        "InstanceLimitExceeded":             true,
        "InsufficientAddressCapacity":       true,
        "InsufficientFreeAddressesInSubnet": true,
-       "InsufficientInstanceCapacity":      true,
        "InsufficientVolumeCapacity":        true,
        "MaxSpotInstanceCountExceeded":      true,
        "VcpuLimitExceeded":                 true,
 }
 
-// isErrorCapacity returns whether the error is to be throttled based on its code.
+// isErrorQuota returns whether the error indicates we have reached
+// some usage quota/limit -- i.e., immediately retrying with an equal
+// or larger instance type will probably not work.
+//
 // Returns false if error is nil.
-func isErrorCapacity(err error) bool {
+func isErrorQuota(err error) bool {
        if aerr, ok := err.(awserr.Error); ok && aerr != nil {
-               if _, ok := isCodeCapacity[aerr.Code()]; ok {
+               if _, ok := isCodeQuota[aerr.Code()]; ok {
                        return true
                }
        }
@@ -720,8 +735,10 @@ func wrapError(err error, throttleValue *atomic.Value) error {
                }
                throttleValue.Store(d)
                return rateLimitError{error: err, earliestRetry: time.Now().Add(d)}
-       } else if isErrorCapacity(err) {
+       } else if isErrorQuota(err) {
                return &ec2QuotaError{err}
+       } else if aerr, ok := err.(awserr.Error); ok && aerr != nil && aerr.Code() == "InsufficientInstanceCapacity" {
+               return &capacityError{err, true}
        } else if err != nil {
                throttleValue.Store(time.Duration(0))
                return err
index 6fde4bbbcafb9ba8e43f0ab55cbe9d141ecc2c79..a57fcebf76874bea175462da8c272ee746991b82 100644 (file)
@@ -508,8 +508,15 @@ func (*EC2InstanceSetSuite) TestWrapError(c *check.C) {
        _, ok := wrapped.(cloud.RateLimitError)
        c.Check(ok, check.Equals, true)
 
-       quotaError := awserr.New("InsufficientInstanceCapacity", "", nil)
+       quotaError := awserr.New("InstanceLimitExceeded", "", nil)
        wrapped = wrapError(quotaError, nil)
        _, ok = wrapped.(cloud.QuotaError)
        c.Check(ok, check.Equals, true)
+
+       capacityError := awserr.New("InsufficientInstanceCapacity", "", nil)
+       wrapped = wrapError(capacityError, nil)
+       caperr, ok := wrapped.(cloud.CapacityError)
+       c.Check(ok, check.Equals, true)
+       c.Check(caperr.IsCapacityError(), check.Equals, true)
+       c.Check(caperr.IsInstanceTypeSpecific(), check.Equals, true)
 }
index 7c532fda4a9e6beaa1811ae3205dc592f186c315..a2aa9e143296f9251bf629c41863b6f985fac258 100644 (file)
@@ -37,6 +37,20 @@ type QuotaError interface {
        error
 }
 
+// A CapacityError should be returned by an InstanceSet's Create
+// method when the cloud service indicates it has insufficient
+// capacity to create new instances -- i.e., we shouldn't retry right
+// away.
+type CapacityError interface {
+       // If true, wait before trying to create more instances.
+       IsCapacityError() bool
+       // If true, the condition is specific to the requested
+       // instance types.  Wait before trying to create more
+       // instances of that same type.
+       IsInstanceTypeSpecific() bool
+       error
+}
+
 type SharedResourceTags map[string]string
 type InstanceSetID string
 type InstanceTags map[string]string
index 78f8c804e20087aa2b49ff307e91dfb0681efcb9..6e56bd8c40962e9f9c6c595cf0ef4cc550fa82eb 100644 (file)
@@ -34,6 +34,7 @@ type WorkerPool interface {
        Running() map[string]time.Time
        Unallocated() map[arvados.InstanceType]int
        CountWorkers() map[worker.State]int
+       AtCapacity(arvados.InstanceType) bool
        AtQuota() bool
        Create(arvados.InstanceType) bool
        Shutdown(arvados.InstanceType) bool
index 6a717bf44463b7688457260b934af091d8bfb4d7..3505c3e064e288a22d0c59cbbdfdadc65edd44ce 100644 (file)
@@ -149,6 +149,7 @@ func (sch *Scheduler) runQueue() {
        }).Debug("runQueue")
 
        dontstart := map[arvados.InstanceType]bool{}
+       var atcapacity = map[string]bool{}    // ProviderTypes reported as AtCapacity during this runQueue() invocation
        var overquota []container.QueueEnt    // entries that are unmappable because of worker pool quota
        var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
        var containerAllocatedWorkerBootingCount int
@@ -189,6 +190,11 @@ tryrun:
                                overquota = sorted[i:]
                                break tryrun
                        }
+                       if unalloc[it] < 1 && (atcapacity[it.ProviderType] || sch.pool.AtCapacity(it)) {
+                               logger.Trace("not locking: AtCapacity and no unalloc workers")
+                               atcapacity[it.ProviderType] = true
+                               continue
+                       }
                        if sch.pool.KillContainer(ctr.UUID, "about to lock") {
                                logger.Info("not locking: crunch-run process from previous attempt has not exited")
                                continue
@@ -211,6 +217,27 @@ tryrun:
                                logger.Trace("overquota")
                                overquota = sorted[i:]
                                break tryrun
+                       } else if atcapacity[it.ProviderType] || sch.pool.AtCapacity(it) {
+                               // Continue trying lower-priority
+                               // containers in case they can run on
+                               // different instance types that are
+                               // available.
+                               //
+                               // The local "atcapacity" cache helps
+                               // when the pool's flag resets after
+                               // we look at container A but before
+                               // we look at lower-priority container
+                               // B. In that case we want to run
+                               // container A on the next call to
+                               // runQueue(), rather than run
+                               // container B now.
+                               //
+                               // TODO: try running this container on
+                               // a bigger (but not much more
+                               // expensive) instance type.
+                               logger.WithField("InstanceType", it.Name).Trace("at capacity")
+                               atcapacity[it.ProviderType] = true
+                               continue
                        } else if sch.pool.Create(it) {
                                // Success. (Note pool.Create works
                                // asynchronously and does its own
@@ -219,10 +246,7 @@ tryrun:
                                logger.Info("creating new instance")
                        } else {
                                // Failed despite not being at quota,
-                               // e.g., cloud ops throttled.  TODO:
-                               // avoid getting starved here if
-                               // instances of a specific type always
-                               // fail.
+                               // e.g., cloud ops throttled.
                                logger.Trace("pool declined to create new instance")
                                continue
                        }
index a8df944b4c9faa30488efe5a7e6e47986e98ec58..76eecead5ed002171b7636a3ef38b9bc3acfb5e4 100644 (file)
@@ -32,10 +32,12 @@ var (
 type stubPool struct {
        notify    <-chan struct{}
        unalloc   map[arvados.InstanceType]int // idle+booting+unknown
+       busy      map[arvados.InstanceType]int
        idle      map[arvados.InstanceType]int
        unknown   map[arvados.InstanceType]int
        running   map[string]time.Time
        quota     int
+       capacity  map[string]int
        canCreate int
        creates   []arvados.InstanceType
        starts    []string
@@ -55,6 +57,20 @@ func (p *stubPool) AtQuota() bool {
        }
        return n >= p.quota
 }
+func (p *stubPool) AtCapacity(it arvados.InstanceType) bool {
+       supply, ok := p.capacity[it.ProviderType]
+       if !ok {
+               return false
+       }
+       for _, existing := range []map[arvados.InstanceType]int{p.unalloc, p.busy} {
+               for eit, n := range existing {
+                       if eit.ProviderType == it.ProviderType {
+                               supply -= n
+                       }
+               }
+       }
+       return supply < 1
+}
 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
 func (p *stubPool) Running() map[string]time.Time {
@@ -116,6 +132,7 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container
        if p.idle[it] == 0 {
                return false
        }
+       p.busy[it]++
        p.idle[it]--
        p.unalloc[it]--
        p.running[ctr.UUID] = time.Time{}
@@ -186,6 +203,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                        test.InstanceType(1): 1,
                        test.InstanceType(2): 2,
                },
+               busy:      map[arvados.InstanceType]int{},
                running:   map[string]time.Time{},
                canCreate: 0,
        }
@@ -236,6 +254,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        idle: map[arvados.InstanceType]int{
                                test.InstanceType(2): 2,
                        },
+                       busy:      map[arvados.InstanceType]int{},
                        running:   map[string]time.Time{},
                        creates:   []arvados.InstanceType{},
                        starts:    []string{},
@@ -272,6 +291,85 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
        }
 }
 
+// If pool.AtCapacity(it) is true for one instance type, try running a
+// lower-priority container that uses a different node type.  Don't
+// lock/unlock/start any container that requires the affected instance
+// type.
+func (*SchedulerSuite) TestInstanceCapacity(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+
+       queue := test.Queue{
+               ChooseType: chooseType,
+               Containers: []arvados.Container{
+                       {
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(2),
+                               Priority: 2,
+                               State:    arvados.ContainerStateQueued,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 4,
+                                       RAM:   4 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(3),
+                               Priority: 3,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 4,
+                                       RAM:   4 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(4),
+                               Priority: 4,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 4,
+                                       RAM:   4 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       pool := stubPool{
+               quota:    99,
+               capacity: map[string]int{test.InstanceType(4).ProviderType: 1},
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(4): 1,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(4): 1,
+               },
+               busy:      map[arvados.InstanceType]int{},
+               running:   map[string]time.Time{},
+               creates:   []arvados.InstanceType{},
+               starts:    []string{},
+               canCreate: 99,
+       }
+       sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
+       sch.sync()
+       sch.runQueue()
+       sch.sync()
+
+       // Start container4, but then pool reports AtCapacity for
+       // type4, so we skip trying to create an instance for
+       // container3, but do try to create a type2 instance for
+       // container2.
+       c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4), test.ContainerUUID(1)})
+       c.Check(pool.shutdowns, check.Equals, 0)
+       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
+       c.Check(queue.StateChanges(), check.HasLen, 0)
+}
+
 // Don't unlock containers or shutdown unalloc (booting/idle) nodes
 // just because some 503 errors caused us to reduce maxConcurrency
 // below the current load level.
@@ -347,6 +445,10 @@ func (*SchedulerSuite) TestIdleIn503QuietPeriod(c *check.C) {
                        test.InstanceType(2): 1,
                        test.InstanceType(3): 1,
                },
+               busy: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 1,
+                       test.InstanceType(3): 1,
+               },
                running: map[string]time.Time{
                        test.ContainerUUID(1): {},
                        test.ContainerUUID(3): {},
@@ -400,6 +502,9 @@ func (*SchedulerSuite) TestUnlockExcessSupervisors(c *check.C) {
                idle: map[arvados.InstanceType]int{
                        test.InstanceType(2): 1,
                },
+               busy: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 4,
+               },
                running: map[string]time.Time{
                        test.ContainerUUID(1): {},
                        test.ContainerUUID(2): {},
@@ -461,6 +566,9 @@ func (*SchedulerSuite) TestExcessSupervisors(c *check.C) {
                idle: map[arvados.InstanceType]int{
                        test.InstanceType(2): 1,
                },
+               busy: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 2,
+               },
                running: map[string]time.Time{
                        test.ContainerUUID(5): {},
                        test.ContainerUUID(6): {},
@@ -515,6 +623,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
                idle: map[arvados.InstanceType]int{
                        test.InstanceType(3): 2,
                },
+               busy:      map[arvados.InstanceType]int{},
                running:   map[string]time.Time{},
                creates:   []arvados.InstanceType{},
                starts:    []string{},
@@ -553,6 +662,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                        test.InstanceType(1): 1,
                        test.InstanceType(2): 1,
                },
+               busy:      map[arvados.InstanceType]int{},
                running:   map[string]time.Time{},
                canCreate: 4,
        }
@@ -646,6 +756,9 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
                idle: map[arvados.InstanceType]int{
                        test.InstanceType(2): 0,
                },
+               busy: map[arvados.InstanceType]int{
+                       test.InstanceType(2): 1,
+               },
                running: map[string]time.Time{
                        test.ContainerUUID(2): {},
                },
@@ -742,6 +855,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
        pool = stubPool{
                idle:    map[arvados.InstanceType]int{test.InstanceType(1): 1},
                unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+               busy:    map[arvados.InstanceType]int{},
                running: map[string]time.Time{},
        }
        sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond, 0, 0, 0)
@@ -815,6 +929,7 @@ func (*SchedulerSuite) TestSkipSupervisors(c *check.C) {
                        test.InstanceType(1): 4,
                        test.InstanceType(2): 4,
                },
+               busy:      map[arvados.InstanceType]int{},
                running:   map[string]time.Time{},
                canCreate: 0,
        }
index 15b0dbcde57d4d3af93233b11bcc663973015a58..fc9f5445d6504dff89f02ebc9de65b80f56f28b4 100644 (file)
@@ -82,6 +82,9 @@ const (
        // instances have been shutdown.
        quotaErrorTTL = time.Minute
 
+       // Time after a capacity error to try again
+       capacityErrorTTL = time.Minute
+
        // Time between "X failed because rate limiting" messages
        logRateLimitErrorInterval = time.Second * 10
 )
@@ -181,6 +184,7 @@ type Pool struct {
        atQuotaUntilFewerInstances int
        atQuotaUntil               time.Time
        atQuotaErr                 cloud.QuotaError
+       atCapacityUntil            map[string]time.Time
        stop                       chan bool
        mtx                        sync.RWMutex
        setupOnce                  sync.Once
@@ -320,14 +324,11 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                // Boot probe is certain to fail.
                return false
        }
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) ||
-               wp.atQuotaUntilFewerInstances > 0 ||
-               wp.instanceSet.throttleCreate.Error() != nil ||
-               (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
+       if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
                return false
        }
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
        // The maxConcurrentInstanceCreateOps knob throttles the number of node create
        // requests in flight. It was added to work around a limitation in Azure's
        // managed disks, which support no more than 20 concurrent node creation
@@ -381,6 +382,18 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                                        logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
                                }
                        }
+                       if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
+                               capKey := it.ProviderType
+                               if !err.IsInstanceTypeSpecific() {
+                                       // set capacity flag for all
+                                       // instance types
+                                       capKey = ""
+                               }
+                               if wp.atCapacityUntil == nil {
+                                       wp.atCapacityUntil = map[string]time.Time{}
+                               }
+                               wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
+                       }
                        logger.WithError(err).Error("create failed")
                        wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
@@ -393,6 +406,22 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        return true
 }
 
+// AtCapacity returns true if Create() is currently expected to fail
+// for the given instance type.
+func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
+               // at capacity for this instance type
+               return true
+       }
+       if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
+               // at capacity for all instance types
+               return true
+       }
+       return false
+}
+
 // AtQuota returns true if Create is not expected to work at the
 // moment (e.g., cloud provider has reported quota errors, or we are
 // already at our own configured quota).