Merge branch '19973-dispatch-throttle' into main
authorTom Clegg <tom@curii.com>
Tue, 21 Feb 2023 22:04:30 +0000 (17:04 -0500)
committerTom Clegg <tom@curii.com>
Tue, 21 Feb 2023 22:04:30 +0000 (17:04 -0500)
fixes #19973
fixes #19984

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/scheduler.go
lib/dispatchcloud/scheduler/sync_test.go
lib/dispatchcloud/worker/pool.go
sdk/go/arvados/client.go
sdk/go/arvados/limiter.go [new file with mode: 0644]
sdk/go/arvados/limiter_test.go [new file with mode: 0644]

index 3403c50c972987e7f6f21a927a6db592fac9f6fc..270c6d43dd12599d4ba077cd740fdfb15ce93bd3 100644 (file)
@@ -193,7 +193,7 @@ func (disp *dispatcher) run() {
        if pollInterval <= 0 {
                pollInterval = defaultPollInterval
        }
-       sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
+       sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
        sched.Start()
        defer sched.Stop()
 
index 2d486da5fd5a9d4aafbbc0b82f06d0c20c7f91e8..a9ed95c7c3b5f581c4e3a60776f0d5c207c34db9 100644 (file)
@@ -6,11 +6,13 @@ package dispatchcloud
 
 import (
        "context"
+       "crypto/tls"
        "encoding/json"
        "io/ioutil"
        "math/rand"
        "net/http"
        "net/http/httptest"
+       "net/url"
        "os"
        "sync"
        "time"
@@ -28,11 +30,12 @@ import (
 var _ = check.Suite(&DispatcherSuite{})
 
 type DispatcherSuite struct {
-       ctx        context.Context
-       cancel     context.CancelFunc
-       cluster    *arvados.Cluster
-       stubDriver *test.StubDriver
-       disp       *dispatcher
+       ctx            context.Context
+       cancel         context.CancelFunc
+       cluster        *arvados.Cluster
+       stubDriver     *test.StubDriver
+       disp           *dispatcher
+       error503Server *httptest.Server
 }
 
 func (s *DispatcherSuite) SetUpTest(c *check.C) {
@@ -100,6 +103,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
        arvClient, err := arvados.NewClientFromConfig(s.cluster)
        c.Check(err, check.IsNil)
 
+       s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) }))
+       arvClient.Client = &http.Client{
+               Transport: &http.Transport{
+                       Proxy: s.arvClientProxy(c),
+                       TLSClientConfig: &tls.Config{
+                               InsecureSkipVerify: true}}}
+
        s.disp = &dispatcher{
                Cluster:   s.cluster,
                Context:   s.ctx,
@@ -115,6 +125,20 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 func (s *DispatcherSuite) TearDownTest(c *check.C) {
        s.cancel()
        s.disp.Close()
+       s.error503Server.Close()
+}
+
+// Intercept outgoing API requests for "/503" and respond HTTP
+// 503. This lets us force (*arvados.Client)Last503() to return
+// something.
+func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
+       return func(req *http.Request) (*url.URL, error) {
+               if req.URL.Path == "/503" {
+                       return url.Parse(s.error503Server.URL)
+               } else {
+                       return nil, nil
+               }
+       }
 }
 
 // DispatchToStubDriver checks that the dispatcher wires everything
@@ -157,6 +181,10 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        return
                }
                delete(waiting, ctr.UUID)
+               if len(waiting) == 100 {
+                       // trigger scheduler maxConcurrency limit
+                       s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
+               }
                if len(waiting) == 0 {
                        close(done)
                }
@@ -230,7 +258,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
        c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
        c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
-       c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} 0.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} [0-9]+\n.*`)
        c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="disappeared"} [^0].*`)
        c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="failure"} [^0].*`)
        c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="success"} [^0].*`)
@@ -250,6 +278,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="success"} [0-9e+.]*`)
        c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="fail"} [0-9]*`)
        c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="fail"} [0-9e+.]*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*last_503_time [1-9][0-9e+.]*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
 }
 
 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
index f729f0dc23a7f927eca6c39fe75734a4a2355ad9..1e5ac2e0466b97730d5eccc9620134d021a66c20 100644 (file)
@@ -13,6 +13,8 @@ import (
        "github.com/sirupsen/logrus"
 )
 
+var quietAfter503 = time.Minute
+
 func (sch *Scheduler) runQueue() {
        unsorted, _ := sch.queue.Entries()
        sorted := make([]container.QueueEnt, 0, len(unsorted))
@@ -35,15 +37,53 @@ func (sch *Scheduler) runQueue() {
        running := sch.pool.Running()
        unalloc := sch.pool.Unallocated()
 
+       if t := sch.client.Last503(); t.After(sch.last503time) {
+               // API has sent an HTTP 503 response since last time
+               // we checked. Use current #containers - 1 as
+               // maxConcurrency, i.e., try to stay just below the
+               // level where we see 503s.
+               sch.last503time = t
+               if newlimit := len(running) - 1; newlimit < 1 {
+                       sch.maxConcurrency = 1
+               } else {
+                       sch.maxConcurrency = newlimit
+               }
+       } else if sch.maxConcurrency > 0 && time.Since(sch.last503time) > quietAfter503 {
+               // If we haven't seen any 503 errors lately, raise
+               // limit to ~10% beyond the current workload.
+               //
+               // As we use the added 10% to schedule more
+               // containers, len(running) will increase and we'll
+               // push the limit up further. Soon enough,
+               // maxConcurrency will get high enough to schedule the
+               // entire queue, hit pool quota, or get 503s again.
+               max := len(running)*11/10 + 1
+               if sch.maxConcurrency < max {
+                       sch.maxConcurrency = max
+               }
+       }
+       if sch.last503time.IsZero() {
+               sch.mLast503Time.Set(0)
+       } else {
+               sch.mLast503Time.Set(float64(sch.last503time.Unix()))
+       }
+       sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
+
        sch.logger.WithFields(logrus.Fields{
-               "Containers": len(sorted),
-               "Processes":  len(running),
+               "Containers":     len(sorted),
+               "Processes":      len(running),
+               "maxConcurrency": sch.maxConcurrency,
        }).Debug("runQueue")
 
        dontstart := map[arvados.InstanceType]bool{}
        var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
        var containerAllocatedWorkerBootingCount int
 
+       // trying is #containers running + #containers we're trying to
+       // start. We stop trying to start more containers if this
+       // reaches the dynamic maxConcurrency limit.
+       trying := len(running)
+
 tryrun:
        for i, ctr := range sorted {
                ctr, it := ctr.Container, ctr.InstanceType
@@ -56,8 +96,14 @@ tryrun:
                }
                switch ctr.State {
                case arvados.ContainerStateQueued:
+                       if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+                               logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency)
+                               overquota = sorted[i:]
+                               break tryrun
+                       }
+                       trying++
                        if unalloc[it] < 1 && sch.pool.AtQuota() {
-                               logger.Debug("not locking: AtQuota and no unalloc workers")
+                               logger.Trace("not locking: AtQuota and no unalloc workers")
                                overquota = sorted[i:]
                                break tryrun
                        }
@@ -68,6 +114,12 @@ tryrun:
                        go sch.lockContainer(logger, ctr.UUID)
                        unalloc[it]--
                case arvados.ContainerStateLocked:
+                       if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+                               logger.Debugf("not starting: already at maxConcurrency %d", sch.maxConcurrency)
+                               overquota = sorted[i:]
+                               break tryrun
+                       }
+                       trying++
                        if unalloc[it] > 0 {
                                unalloc[it]--
                        } else if sch.pool.AtQuota() {
@@ -102,8 +154,10 @@ tryrun:
                        } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
                                logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
                        } else if sch.pool.StartContainer(it, ctr) {
+                               logger.Trace("StartContainer => true")
                                // Success.
                        } else {
+                               logger.Trace("StartContainer => false")
                                containerAllocatedWorkerBootingCount += 1
                                dontstart[it] = true
                        }
@@ -115,10 +169,13 @@ tryrun:
 
        if len(overquota) > 0 {
                // Unlock any containers that are unmappable while
-               // we're at quota.
+               // we're at quota (but if they have already been
+               // scheduled and they're loading docker images etc.,
+               // let them run).
                for _, ctr := range overquota {
                        ctr := ctr.Container
-                       if ctr.State == arvados.ContainerStateLocked {
+                       _, toolate := running[ctr.UUID]
+                       if ctr.State == arvados.ContainerStateLocked && !toolate {
                                logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
                                logger.Debug("unlock because pool capacity is used by higher priority containers")
                                err := sch.queue.Unlock(ctr.UUID)
index 5b5fa960a1f5e167b0175ffcabff3873ac8419a1..855399337589121c0e26d361febcde3b897fe33e 100644 (file)
@@ -52,7 +52,14 @@ type stubPool struct {
 func (p *stubPool) AtQuota() bool {
        p.Lock()
        defer p.Unlock()
-       return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
+       n := len(p.running)
+       for _, nn := range p.unalloc {
+               n += nn
+       }
+       for _, nn := range p.unknown {
+               n += nn
+       }
+       return n >= p.quota
 }
 func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
 func (p *stubPool) Unsubscribe(<-chan struct{}) {}
@@ -188,7 +195,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                running:   map[string]time.Time{},
                canCreate: 0,
        }
-       New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
        c.Check(pool.running, check.HasLen, 1)
@@ -201,12 +208,8 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 // call Create().
 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
        ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
-       for quota := 1; quota < 3; quota++ {
+       for quota := 1; quota <= 3; quota++ {
                c.Logf("quota=%d", quota)
-               shouldCreate := []arvados.InstanceType{}
-               for i := 1; i < quota; i++ {
-                       shouldCreate = append(shouldCreate, test.InstanceType(3))
-               }
                queue := test.Queue{
                        ChooseType: chooseType,
                        Containers: []arvados.Container{
@@ -244,22 +247,34 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        starts:    []string{},
                        canCreate: 0,
                }
-               sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
-               sch.runQueue()
+               sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
                sch.sync()
                sch.runQueue()
                sch.sync()
-               c.Check(pool.creates, check.DeepEquals, shouldCreate)
-               if len(shouldCreate) == 0 {
-                       c.Check(pool.starts, check.DeepEquals, []string{})
-               } else {
+               switch quota {
+               case 1, 2:
+                       // Can't create a type3 node for ctr3, so we
+                       // shutdown an unallocated node (type2), and
+                       // unlock both containers.
+                       c.Check(pool.starts, check.HasLen, 0)
+                       c.Check(pool.shutdowns, check.Equals, 1)
+                       c.Check(pool.creates, check.HasLen, 0)
+                       c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
+                               {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
+                               {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
+                       })
+               case 3:
+                       // Creating a type3 instance works, so we
+                       // start ctr2 on a type2 instance, and leave
+                       // ctr3 locked while we wait for the new
+                       // instance to come up.
                        c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
+                       c.Check(pool.shutdowns, check.Equals, 0)
+                       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
+                       c.Check(queue.StateChanges(), check.HasLen, 0)
+               default:
+                       panic("test not written for quota>3")
                }
-               c.Check(pool.shutdowns, check.Equals, 3-quota)
-               c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
-                       {UUID: "zzzzz-dz642-000000000000003", From: "Locked", To: "Queued"},
-                       {UUID: "zzzzz-dz642-000000000000002", From: "Locked", To: "Queued"},
-               })
        }
 }
 
@@ -293,24 +308,24 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
        pool := stubPool{
                quota: 2,
                unalloc: map[arvados.InstanceType]int{
-                       test.InstanceType(3): 1,
+                       test.InstanceType(3): 2,
                },
                idle: map[arvados.InstanceType]int{
-                       test.InstanceType(3): 1,
+                       test.InstanceType(3): 2,
                },
                running:   map[string]time.Time{},
                creates:   []arvados.InstanceType{},
                starts:    []string{},
-               canCreate: 1,
+               canCreate: 0,
        }
-       sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+       sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
        for i := 0; i < 30; i++ {
                sch.runQueue()
                sch.sync()
                time.Sleep(time.Millisecond)
        }
        c.Check(pool.shutdowns, check.Equals, 0)
-       c.Check(pool.starts, check.HasLen, 1)
+       c.Check(pool.starts, check.HasLen, 2)
        unlocked := map[string]int{}
        for _, chg := range queue.StateChanges() {
                if chg.To == arvados.ContainerStateQueued {
@@ -405,7 +420,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                },
        }
        queue.Update()
-       New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
        running := map[string]bool{}
@@ -449,7 +464,7 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
                },
        }
        queue.Update()
-       sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+       sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
        c.Check(pool.running, check.HasLen, 1)
        sch.sync()
        for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
@@ -482,7 +497,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
        pool := stubPool{
                unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
        }
-       sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+       sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
        sch.runQueue()
        sch.updateMetrics()
 
@@ -494,7 +509,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
        // 'over quota' metric will be 1 because no workers are available and canCreate defaults
        // to zero.
        pool = stubPool{}
-       sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+       sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
        sch.runQueue()
        sch.updateMetrics()
 
@@ -527,7 +542,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
                unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
                running: map[string]time.Time{},
        }
-       sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+       sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
        sch.runQueue()
        sch.updateMetrics()
 
index c3e67dd11f70a4e00c8a74f59826efb13bf0e35c..4644dc4ea4db00782b38589f546f5cb22d577e88 100644 (file)
@@ -31,6 +31,7 @@ import (
 // shuts down idle workers, in case they are consuming quota.
 type Scheduler struct {
        logger              logrus.FieldLogger
+       client              *arvados.Client
        queue               ContainerQueue
        pool                WorkerPool
        reg                 *prometheus.Registry
@@ -45,18 +46,24 @@ type Scheduler struct {
        stop    chan struct{}
        stopped chan struct{}
 
+       last503time    time.Time // last time API responded 503
+       maxConcurrency int       // dynamic container limit (0 = unlimited), see runQueue()
+
        mContainersAllocatedNotStarted   prometheus.Gauge
        mContainersNotAllocatedOverQuota prometheus.Gauge
        mLongestWaitTimeSinceQueue       prometheus.Gauge
+       mLast503Time                     prometheus.Gauge
+       mMaxContainerConcurrency         prometheus.Gauge
 }
 
 // New returns a new unstarted Scheduler.
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
        sch := &Scheduler{
                logger:              ctxlog.FromContext(ctx),
+               client:              client,
                queue:               queue,
                pool:                pool,
                reg:                 reg,
@@ -96,6 +103,20 @@ func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
                Help:      "Current longest wait time of any container since queuing, and before the start of crunch-run.",
        })
        reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+       sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "last_503_time",
+               Help:      "Time of most recent 503 error received from API.",
+       })
+       reg.MustRegister(sch.mLast503Time)
+       sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "max_concurrent_containers",
+               Help:      "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
+       })
+       reg.MustRegister(sch.mMaxContainerConcurrency)
 }
 
 func (sch *Scheduler) updateMetrics() {
index a3ff0636e1cd9e7eec69beacc1956c3fa3db08c9..df254cd32e5522aa2289d1823e1a96adec7464ab 100644 (file)
@@ -48,7 +48,7 @@ func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
        ents, _ := queue.Entries()
        c.Check(ents, check.HasLen, 1)
 
-       sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+       sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
        sch.sync()
 
        ents, _ = queue.Entries()
@@ -80,7 +80,7 @@ func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
        ents, _ := queue.Entries()
        c.Check(ents, check.HasLen, 1)
 
-       sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+       sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 
        // Sync shouldn't cancel the container because it might be
        // running on the VM with state=="unknown".
index 3abcba6c7365766cf0c7d38315d47dc1292a03e1..c270eef4943eb48f44ba264ac2dee472e747e5b5 100644 (file)
@@ -397,10 +397,15 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       wkr := wp.workers[inst.ID()]
+       wkr, ok := wp.workers[inst.ID()]
+       if !ok {
+               // race: inst was removed from the pool
+               return
+       }
        if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
-               // the node is not in booting state (can happen if a-d-c is restarted) OR
-               // this is not the first SSH connection
+               // the node is not in booting state (can happen if
+               // a-d-c is restarted) OR this is not the first SSH
+               // connection
                return
        }
 
index 5a498b01f0c98716ae0c34466272bf43ea7bf812..05176214ae1eb188fd4c8a0113b76e115f095a99 100644 (file)
@@ -23,6 +23,7 @@ import (
        "os"
        "regexp"
        "strings"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/httpserver"
@@ -76,6 +77,14 @@ type Client struct {
        // APIHost and AuthToken were loaded from ARVADOS_* env vars
        // (used to customize "no host/token" error messages)
        loadedFromEnv bool
+
+       // Track/limit concurrent outgoing API calls. Note this
+       // differs from an outgoing connection limit (a feature
+       // provided by http.Transport) when concurrent calls are
+       // multiplexed on a single http2 connection.
+       requestLimiter requestLimiter
+
+       last503 atomic.Value
 }
 
 // InsecureHTTPClient is the default http.Client used by a Client with
@@ -220,10 +229,12 @@ func NewClientFromEnv() *Client {
 
 var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
 
-// Do adds Authorization and X-Request-Id headers and then calls
+// Do adds Authorization and X-Request-Id headers, delays in order to
+// comply with rate-limiting restrictions, and then calls
 // (*http.Client)Do().
 func (c *Client) Do(req *http.Request) (*http.Response, error) {
-       if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+       ctx := req.Context()
+       if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
                req.Header.Add("Authorization", auth)
        } else if c.AuthToken != "" {
                req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
@@ -231,7 +242,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 
        if req.Header.Get("X-Request-Id") == "" {
                var reqid string
-               if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+               if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
                        reqid = ctxreqid
                } else if c.defaultRequestID != "" {
                        reqid = c.defaultRequestID
@@ -246,23 +257,48 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
        }
        var cancel context.CancelFunc
        if c.Timeout > 0 {
-               ctx := req.Context()
                ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
                req = req.WithContext(ctx)
+       } else {
+               cancel = context.CancelFunc(func() {})
+       }
+
+       c.requestLimiter.Acquire(ctx)
+       if ctx.Err() != nil {
+               c.requestLimiter.Release()
+               return nil, ctx.Err()
+       }
+
+       // Attach Release() to cancel func, see cancelOnClose below.
+       cancelOrig := cancel
+       cancel = func() {
+               c.requestLimiter.Release()
+               cancelOrig()
        }
+
        resp, err := c.httpClient().Do(req)
-       if err == nil && cancel != nil {
+       if c.requestLimiter.Report(resp, err) {
+               c.last503.Store(time.Now())
+       }
+       if err == nil {
                // We need to call cancel() eventually, but we can't
                // use "defer cancel()" because the context has to
                // stay alive until the caller has finished reading
                // the response body.
                resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
-       } else if cancel != nil {
+       } else {
                cancel()
        }
        return resp, err
 }
 
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+       t, _ := c.last503.Load().(time.Time)
+       return t
+}
+
 // cancelOnClose calls a provided CancelFunc when its wrapped
 // ReadCloser's Close() method is called.
 type cancelOnClose struct {
diff --git a/sdk/go/arvados/limiter.go b/sdk/go/arvados/limiter.go
new file mode 100644 (file)
index 0000000..f62264c
--- /dev/null
@@ -0,0 +1,146 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+       "context"
+       "errors"
+       "net/http"
+       "net/url"
+       "sync"
+       "time"
+)
+
+var requestLimiterQuietPeriod = time.Second
+
+type requestLimiter struct {
+       current    int64
+       limit      int64
+       lock       sync.Mutex
+       cond       *sync.Cond
+       quietUntil time.Time
+}
+
+// Acquire reserves one request slot, waiting if necessary.
+//
+// Acquire returns early if ctx cancels before a slot is available. It
+// is assumed in this case the caller will immediately notice
+// ctx.Err() != nil and call Release().
+func (rl *requestLimiter) Acquire(ctx context.Context) {
+       rl.lock.Lock()
+       if rl.cond == nil {
+               // First use of requestLimiter. Initialize.
+               rl.cond = sync.NewCond(&rl.lock)
+       }
+       // Wait out the quiet period(s) immediately following a 503.
+       for ctx.Err() == nil {
+               delay := rl.quietUntil.Sub(time.Now())
+               if delay < 0 {
+                       break
+               }
+               // Wait for the end of the quiet period, which started
+               // when we last received a 503 response.
+               rl.lock.Unlock()
+               timer := time.NewTimer(delay)
+               select {
+               case <-timer.C:
+               case <-ctx.Done():
+                       timer.Stop()
+               }
+               rl.lock.Lock()
+       }
+       ready := make(chan struct{})
+       go func() {
+               // close ready when a slot is available _or_ we wake
+               // up and find ctx has been canceled (meaning Acquire
+               // has already returned, or is about to).
+               for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
+                       rl.cond.Wait()
+               }
+               close(ready)
+       }()
+       select {
+       case <-ready:
+               // Wait() returned, so we have the lock.
+               rl.current++
+               rl.lock.Unlock()
+       case <-ctx.Done():
+               // When Wait() returns the lock to our goroutine
+               // (which might have already happened) we need to
+               // release it (if we don't do this now, the following
+               // Lock() can deadlock).
+               go func() {
+                       <-ready
+                       rl.lock.Unlock()
+               }()
+               // Note we may have current > limit until the caller
+               // calls Release().
+               rl.lock.Lock()
+               rl.current++
+               rl.lock.Unlock()
+       }
+}
+
+// Release releases a slot that has been reserved with Acquire.
+func (rl *requestLimiter) Release() {
+       rl.lock.Lock()
+       rl.current--
+       rl.lock.Unlock()
+       rl.cond.Signal()
+}
+
+// Report uses the return values from (*http.Client)Do() to adjust the
+// outgoing request limit (increase on success, decrease on 503).
+//
+// Return value is true if the response was a 503.
+func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
+       rl.lock.Lock()
+       defer rl.lock.Unlock()
+       is503 := false
+       if err != nil {
+               uerr := &url.Error{}
+               if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
+                       // This is how http.Client reports 503 from proxy server
+                       is503 = true
+               } else {
+                       return false
+               }
+       } else {
+               is503 = resp.StatusCode == http.StatusServiceUnavailable
+       }
+       if is503 {
+               if rl.limit == 0 {
+                       // Concurrency was unlimited until now.
+                       // Calculate new limit based on actual
+                       // concurrency instead of previous limit.
+                       rl.limit = rl.current
+               }
+               if time.Now().After(rl.quietUntil) {
+                       // Reduce concurrency limit by half.
+                       rl.limit = (rl.limit + 1) / 2
+                       // Don't start any new calls (or reduce the
+                       // limit even further on additional 503s) for
+                       // a second.
+                       rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
+               }
+               return true
+       }
+       if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
+               // After each non-server-error response, increase
+               // concurrency limit by at least 10% -- but not beyond
+               // 2x the highest concurrency level we've seen without
+               // a failure.
+               increase := rl.limit / 10
+               if increase < 1 {
+                       increase = 1
+               }
+               rl.limit += increase
+               if max := rl.current * 2; max > rl.limit {
+                       rl.limit = max
+               }
+               rl.cond.Broadcast()
+       }
+       return false
+}
diff --git a/sdk/go/arvados/limiter_test.go b/sdk/go/arvados/limiter_test.go
new file mode 100644 (file)
index 0000000..d32ab96
--- /dev/null
@@ -0,0 +1,110 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+       "context"
+       "errors"
+       "net/http"
+       "sync"
+       "time"
+
+       . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&limiterSuite{})
+
+type limiterSuite struct{}
+
+func (*limiterSuite) TestUnlimitedBeforeFirstReport(c *C) {
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+       defer cancel()
+       rl := requestLimiter{}
+
+       var wg sync.WaitGroup
+       wg.Add(1000)
+       for i := 0; i < 1000; i++ {
+               go func() {
+                       rl.Acquire(ctx)
+                       wg.Done()
+               }()
+       }
+       wg.Wait()
+       c.Check(rl.current, Equals, int64(1000))
+       wg.Add(1000)
+       for i := 0; i < 1000; i++ {
+               go func() {
+                       rl.Release()
+                       wg.Done()
+               }()
+       }
+       wg.Wait()
+       c.Check(rl.current, Equals, int64(0))
+}
+
+func (*limiterSuite) TestCancelWhileWaitingForAcquire(c *C) {
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+       defer cancel()
+       rl := requestLimiter{}
+
+       rl.limit = 1
+       rl.Acquire(ctx)
+       ctxShort, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond))
+       defer cancel()
+       rl.Acquire(ctxShort)
+       c.Check(rl.current, Equals, int64(2))
+       c.Check(ctxShort.Err(), NotNil)
+       rl.Release()
+       rl.Release()
+       c.Check(rl.current, Equals, int64(0))
+}
+
+func (*limiterSuite) TestReducedLimitAndQuietPeriod(c *C) {
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+       defer cancel()
+       rl := requestLimiter{}
+
+       // Use a short quiet period to make tests faster
+       defer func(orig time.Duration) { requestLimiterQuietPeriod = orig }(requestLimiterQuietPeriod)
+       requestLimiterQuietPeriod = time.Second / 10
+
+       for i := 0; i < 5; i++ {
+               rl.Acquire(ctx)
+       }
+       rl.Report(&http.Response{StatusCode: http.StatusServiceUnavailable}, nil)
+       c.Check(rl.limit, Equals, int64(3))
+       for i := 0; i < 5; i++ {
+               rl.Release()
+       }
+
+       // Even with all slots released, we can't Acquire in the quiet
+       // period.
+
+       // (a) If our context expires before the end of the quiet
+       // period, we get back DeadlineExceeded -- without waiting for
+       // the end of the quiet period.
+       acquire := time.Now()
+       ctxShort, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod/10))
+       defer cancel()
+       rl.Acquire(ctxShort)
+       c.Check(ctxShort.Err(), Equals, context.DeadlineExceeded)
+       c.Check(time.Since(acquire) < requestLimiterQuietPeriod/2, Equals, true)
+       c.Check(rl.quietUntil.Sub(time.Now()) > requestLimiterQuietPeriod/2, Equals, true)
+       rl.Release()
+
+       // (b) If our context does not expire first, Acquire waits for
+       // the end of the quiet period.
+       ctxLong, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod*2))
+       defer cancel()
+       acquire = time.Now()
+       rl.Acquire(ctxLong)
+       c.Check(time.Since(acquire) > requestLimiterQuietPeriod/10, Equals, true)
+       c.Check(time.Since(acquire) < requestLimiterQuietPeriod, Equals, true)
+       c.Check(ctxLong.Err(), IsNil)
+       rl.Release()
+
+       // OK to call Report() with nil Response and non-nil error.
+       rl.Report(nil, errors.New("network error"))
+}