1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
17 requestLimiterQuietPeriod = time.Second
18 requestLimiterInitialLimit int64 = 8
19 requestLimiterMinimumLimit int64 = 4
22 type requestLimiter struct {
31 // Acquire reserves one request slot, waiting if necessary.
33 // Acquire returns early if ctx cancels before a slot is available. It
34 // is assumed in this case the caller will immediately notice
35 // ctx.Err() != nil and call Release().
36 func (rl *requestLimiter) Acquire(ctx context.Context) {
39 // First use of requestLimiter. Initialize.
40 rl.cond = sync.NewCond(&rl.lock)
41 rl.limit = requestLimiterInitialLimit
43 // Wait out the quiet period(s) immediately following a 503.
44 for ctx.Err() == nil {
45 delay := rl.quietUntil.Sub(time.Now())
49 // Wait for the end of the quiet period, which started
50 // when we last received a 503 response.
52 timer := time.NewTimer(delay)
60 ready := make(chan struct{})
62 // close ready when a slot is available _or_ we wake
63 // up and find ctx has been canceled (meaning Acquire
64 // has already returned, or is about to).
65 for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
72 // Wait() returned, so we have the lock.
76 // When Wait() returns the lock to our goroutine
77 // (which might have already happened) we need to
78 // release it (if we don't do this now, the following
79 // Lock() can deadlock).
84 // Note we may have current > limit until the caller
92 // Release releases a slot that has been reserved with Acquire.
93 func (rl *requestLimiter) Release() {
100 // Report uses the return values from (*http.Client)Do() to adjust the
101 // outgoing request limit (increase on success, decrease on 503).
103 // Return value is true if the response was a 503.
104 func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
106 defer rl.lock.Unlock()
110 if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
111 // This is how http.Client reports 503 from proxy server
117 is503 = resp.StatusCode == http.StatusServiceUnavailable
121 // Concurrency was unlimited until now.
122 // Calculate new limit based on actual
123 // concurrency instead of previous limit.
124 rl.limit = rl.current
126 if time.Now().After(rl.quietUntil) {
127 // Reduce concurrency limit by half.
128 rl.limit = (rl.limit + 1) / 2
129 // Don't start any new calls (or reduce the
130 // limit even further on additional 503s) for
132 rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
136 if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
137 // After each non-server-error response, increase
138 // concurrency limit by at least 10% -- but not beyond
139 // 2x the highest concurrency level we've seen without
141 increase := rl.limit / 10
146 if max := rl.current * 2; max < rl.limit {
149 if min := requestLimiterMinimumLimit; min > rl.limit {
150 // If limit is too low, programs like
151 // controller and test suites can end up with
152 // too few slots to complete a single request.
155 if rl.maxlimit > 0 && rl.maxlimit < rl.limit {
156 rl.limit = rl.maxlimit