From 6bcd1910537f4f24eebb21995e770d7ea6153462 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 25 May 2023 10:12:40 -0400 Subject: [PATCH] 20511: Start with 8 concurrent outgoing API calls, not unlimited. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvados/limiter.go | 6 +++++- sdk/go/arvados/limiter_test.go | 16 ++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sdk/go/arvados/limiter.go b/sdk/go/arvados/limiter.go index f62264c636..ac244593b2 100644 --- a/sdk/go/arvados/limiter.go +++ b/sdk/go/arvados/limiter.go @@ -13,7 +13,10 @@ import ( "time" ) -var requestLimiterQuietPeriod = time.Second +var ( + requestLimiterQuietPeriod = time.Second + requestLimiterInitialLimit int64 = 8 +) type requestLimiter struct { current int64 @@ -33,6 +36,7 @@ func (rl *requestLimiter) Acquire(ctx context.Context) { if rl.cond == nil { // First use of requestLimiter. Initialize. rl.cond = sync.NewCond(&rl.lock) + rl.limit = requestLimiterInitialLimit } // Wait out the quiet period(s) immediately following a 503. for ctx.Err() == nil { diff --git a/sdk/go/arvados/limiter_test.go b/sdk/go/arvados/limiter_test.go index d32ab96999..1e73b1c28f 100644 --- a/sdk/go/arvados/limiter_test.go +++ b/sdk/go/arvados/limiter_test.go @@ -18,23 +18,23 @@ var _ = Suite(&limiterSuite{}) type limiterSuite struct{} -func (*limiterSuite) TestUnlimitedBeforeFirstReport(c *C) { +func (*limiterSuite) TestInitialLimit(c *C) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) defer cancel() rl := requestLimiter{} var wg sync.WaitGroup - wg.Add(1000) - for i := 0; i < 1000; i++ { + wg.Add(int(requestLimiterInitialLimit)) + for i := int64(0); i < requestLimiterInitialLimit; i++ { go func() { rl.Acquire(ctx) wg.Done() }() } wg.Wait() - c.Check(rl.current, Equals, int64(1000)) - wg.Add(1000) - for i := 0; i < 1000; i++ { + c.Check(rl.current, Equals, requestLimiterInitialLimit) + wg.Add(int(requestLimiterInitialLimit)) + for i := int64(0); i < requestLimiterInitialLimit; i++ { go func() { rl.Release() wg.Done() @@ -49,8 +49,8 @@ func (*limiterSuite) TestCancelWhileWaitingForAcquire(c *C) { defer cancel() rl := requestLimiter{} - rl.limit = 1 rl.Acquire(ctx) + rl.limit = 1 ctxShort, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond)) defer cancel() rl.Acquire(ctxShort) @@ -74,7 +74,7 @@ func (*limiterSuite) TestReducedLimitAndQuietPeriod(c *C) { rl.Acquire(ctx) } rl.Report(&http.Response{StatusCode: http.StatusServiceUnavailable}, nil) - c.Check(rl.limit, Equals, int64(3)) + c.Check(rl.limit, Equals, requestLimiterInitialLimit/2) for i := 0; i < 5; i++ { rl.Release() } -- 2.30.2