21227: Use a separate global requestLimiter for each target host.
[arvados.git] / sdk / go / arvados / limiter.go
index ecbe0b18012528929a238297b7b82e4b745138ad..dc944160ab2dd5d459a31fa2386ebe9a5f6bb2c3 100644 (file)
@@ -6,16 +6,22 @@ package arvados
 
 import (
        "context"
+       "errors"
        "net/http"
+       "net/url"
        "sync"
        "time"
 )
 
-var requestLimiterQuietPeriod = time.Second
+var (
+       requestLimiterQuietPeriod        = time.Second
+       requestLimiterInitialLimit int64 = 8
+)
 
 type requestLimiter struct {
        current    int64
        limit      int64
+       maxlimit   int64
        lock       sync.Mutex
        cond       *sync.Cond
        quietUntil time.Time
@@ -31,6 +37,7 @@ func (rl *requestLimiter) Acquire(ctx context.Context) {
        if rl.cond == nil {
                // First use of requestLimiter. Initialize.
                rl.cond = sync.NewCond(&rl.lock)
+               rl.limit = requestLimiterInitialLimit
        }
        // Wait out the quiet period(s) immediately following a 503.
        for ctx.Err() == nil {
@@ -91,13 +98,24 @@ func (rl *requestLimiter) Release() {
 
 // Report uses the return values from (*http.Client)Do() to adjust the
 // outgoing request limit (increase on success, decrease on 503).
-func (rl *requestLimiter) Report(resp *http.Response, err error) {
-       if err != nil {
-               return
-       }
+//
+// Return value is true if the response was a 503.
+func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
        rl.lock.Lock()
        defer rl.lock.Unlock()
-       if resp.StatusCode == http.StatusServiceUnavailable {
+       is503 := false
+       if err != nil {
+               uerr := &url.Error{}
+               if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
+                       // This is how http.Client reports 503 from proxy server
+                       is503 = true
+               } else {
+                       return false
+               }
+       } else {
+               is503 = resp.StatusCode == http.StatusServiceUnavailable
+       }
+       if is503 {
                if rl.limit == 0 {
                        // Concurrency was unlimited until now.
                        // Calculate new limit based on actual
@@ -112,7 +130,9 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) {
                        // a second.
                        rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
                }
-       } else if resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
+               return true
+       }
+       if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
                // After each non-server-error response, increase
                // concurrency limit by at least 10% -- but not beyond
                // 2x the highest concurrency level we've seen without
@@ -122,9 +142,13 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) {
                        increase = 1
                }
                rl.limit += increase
-               if max := rl.current * 2; max > rl.limit {
+               if max := rl.current * 2; max < rl.limit {
                        rl.limit = max
                }
+               if rl.maxlimit > 0 && rl.maxlimit < rl.limit {
+                       rl.limit = rl.maxlimit
+               }
                rl.cond.Broadcast()
        }
+       return false
 }