1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
17 requestLimiterQuietPeriod = time.Second
18 requestLimiterInitialLimit int64 = 8
21 type requestLimiter struct {
30 // Acquire reserves one request slot, waiting if necessary.
32 // Acquire returns early if ctx cancels before a slot is available. It
33 // is assumed in this case the caller will immediately notice
34 // ctx.Err() != nil and call Release().
35 func (rl *requestLimiter) Acquire(ctx context.Context) {
38 // First use of requestLimiter. Initialize.
39 rl.cond = sync.NewCond(&rl.lock)
40 rl.limit = requestLimiterInitialLimit
42 // Wait out the quiet period(s) immediately following a 503.
43 for ctx.Err() == nil {
44 delay := rl.quietUntil.Sub(time.Now())
48 // Wait for the end of the quiet period, which started
49 // when we last received a 503 response.
51 timer := time.NewTimer(delay)
59 ready := make(chan struct{})
61 // close ready when a slot is available _or_ we wake
62 // up and find ctx has been canceled (meaning Acquire
63 // has already returned, or is about to).
64 for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
71 // Wait() returned, so we have the lock.
75 // When Wait() returns the lock to our goroutine
76 // (which might have already happened) we need to
77 // release it (if we don't do this now, the following
78 // Lock() can deadlock).
83 // Note we may have current > limit until the caller
91 // Release releases a slot that has been reserved with Acquire.
92 func (rl *requestLimiter) Release() {
99 // Report uses the return values from (*http.Client)Do() to adjust the
100 // outgoing request limit (increase on success, decrease on 503).
102 // Return value is true if the response was a 503.
103 func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
105 defer rl.lock.Unlock()
109 if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
110 // This is how http.Client reports 503 from proxy server
116 is503 = resp.StatusCode == http.StatusServiceUnavailable
120 // Concurrency was unlimited until now.
121 // Calculate new limit based on actual
122 // concurrency instead of previous limit.
123 rl.limit = rl.current
125 if time.Now().After(rl.quietUntil) {
126 // Reduce concurrency limit by half.
127 rl.limit = (rl.limit + 1) / 2
128 // Don't start any new calls (or reduce the
129 // limit even further on additional 503s) for
131 rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
135 if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
136 // After each non-server-error response, increase
137 // concurrency limit by at least 10% -- but not beyond
138 // 2x the highest concurrency level we've seen without
140 increase := rl.limit / 10
145 if max := rl.current * 2; max < rl.limit {
148 if rl.maxlimit > 0 && rl.maxlimit < rl.limit {
149 rl.limit = rl.maxlimit