20511: Start with 8 concurrent outgoing API calls, not unlimited.
authorTom Clegg <tom@curii.com>
Thu, 25 May 2023 14:12:40 +0000 (10:12 -0400)
committerTom Clegg <tom@curii.com>
Thu, 25 May 2023 14:12:40 +0000 (10:12 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvados/limiter.go
sdk/go/arvados/limiter_test.go

index f62264c636f96dfa4b55ff0d581fc8cd50b05c09..ac244593b2e4ba98309b593c8b0a809a623e70f7 100644 (file)
@@ -13,7 +13,10 @@ import (
        "time"
 )
 
-var requestLimiterQuietPeriod = time.Second
+var (
+       requestLimiterQuietPeriod        = time.Second
+       requestLimiterInitialLimit int64 = 8
+)
 
 type requestLimiter struct {
        current    int64
@@ -33,6 +36,7 @@ func (rl *requestLimiter) Acquire(ctx context.Context) {
        if rl.cond == nil {
                // First use of requestLimiter. Initialize.
                rl.cond = sync.NewCond(&rl.lock)
+               rl.limit = requestLimiterInitialLimit
        }
        // Wait out the quiet period(s) immediately following a 503.
        for ctx.Err() == nil {
index d32ab9699999d97a7dad2c63c4332e937419f739..1e73b1c28f44555da1be00ca926ccc7e9c0f7946 100644 (file)
@@ -18,23 +18,23 @@ var _ = Suite(&limiterSuite{})
 
 type limiterSuite struct{}
 
-func (*limiterSuite) TestUnlimitedBeforeFirstReport(c *C) {
+func (*limiterSuite) TestInitialLimit(c *C) {
        ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
        defer cancel()
        rl := requestLimiter{}
 
        var wg sync.WaitGroup
-       wg.Add(1000)
-       for i := 0; i < 1000; i++ {
+       wg.Add(int(requestLimiterInitialLimit))
+       for i := int64(0); i < requestLimiterInitialLimit; i++ {
                go func() {
                        rl.Acquire(ctx)
                        wg.Done()
                }()
        }
        wg.Wait()
-       c.Check(rl.current, Equals, int64(1000))
-       wg.Add(1000)
-       for i := 0; i < 1000; i++ {
+       c.Check(rl.current, Equals, requestLimiterInitialLimit)
+       wg.Add(int(requestLimiterInitialLimit))
+       for i := int64(0); i < requestLimiterInitialLimit; i++ {
                go func() {
                        rl.Release()
                        wg.Done()
@@ -49,8 +49,8 @@ func (*limiterSuite) TestCancelWhileWaitingForAcquire(c *C) {
        defer cancel()
        rl := requestLimiter{}
 
-       rl.limit = 1
        rl.Acquire(ctx)
+       rl.limit = 1
        ctxShort, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond))
        defer cancel()
        rl.Acquire(ctxShort)
@@ -74,7 +74,7 @@ func (*limiterSuite) TestReducedLimitAndQuietPeriod(c *C) {
                rl.Acquire(ctx)
        }
        rl.Report(&http.Response{StatusCode: http.StatusServiceUnavailable}, nil)
-       c.Check(rl.limit, Equals, int64(3))
+       c.Check(rl.limit, Equals, requestLimiterInitialLimit/2)
        for i := 0; i < 5; i++ {
                rl.Release()
        }