X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/69dcc65ee6b5832f7671c68051dd792981369da0..05ffd8f0f13345044d5ffb4405949794cb316bd6:/sdk/go/arvados/limiter.go diff --git a/sdk/go/arvados/limiter.go b/sdk/go/arvados/limiter.go index ecbe0b1801..dc944160ab 100644 --- a/sdk/go/arvados/limiter.go +++ b/sdk/go/arvados/limiter.go @@ -6,16 +6,22 @@ package arvados import ( "context" + "errors" "net/http" + "net/url" "sync" "time" ) -var requestLimiterQuietPeriod = time.Second +var ( + requestLimiterQuietPeriod = time.Second + requestLimiterInitialLimit int64 = 8 +) type requestLimiter struct { current int64 limit int64 + maxlimit int64 lock sync.Mutex cond *sync.Cond quietUntil time.Time @@ -31,6 +37,7 @@ func (rl *requestLimiter) Acquire(ctx context.Context) { if rl.cond == nil { // First use of requestLimiter. Initialize. rl.cond = sync.NewCond(&rl.lock) + rl.limit = requestLimiterInitialLimit } // Wait out the quiet period(s) immediately following a 503. for ctx.Err() == nil { @@ -91,13 +98,24 @@ func (rl *requestLimiter) Release() { // Report uses the return values from (*http.Client)Do() to adjust the // outgoing request limit (increase on success, decrease on 503). -func (rl *requestLimiter) Report(resp *http.Response, err error) { - if err != nil { - return - } +// +// Return value is true if the response was a 503. +func (rl *requestLimiter) Report(resp *http.Response, err error) bool { rl.lock.Lock() defer rl.lock.Unlock() - if resp.StatusCode == http.StatusServiceUnavailable { + is503 := false + if err != nil { + uerr := &url.Error{} + if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" { + // This is how http.Client reports 503 from proxy server + is503 = true + } else { + return false + } + } else { + is503 = resp.StatusCode == http.StatusServiceUnavailable + } + if is503 { if rl.limit == 0 { // Concurrency was unlimited until now. // Calculate new limit based on actual @@ -112,7 +130,9 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) { // a second. rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod) } - } else if resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 { + return true + } + if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 { // After each non-server-error response, increase // concurrency limit by at least 10% -- but not beyond // 2x the highest concurrency level we've seen without @@ -122,9 +142,13 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) { increase = 1 } rl.limit += increase - if max := rl.current * 2; max > rl.limit { + if max := rl.current * 2; max < rl.limit { rl.limit = max } + if rl.maxlimit > 0 && rl.maxlimit < rl.limit { + rl.limit = rl.maxlimit + } rl.cond.Broadcast() } + return false }