21227: Don't reduce outgoing request concurrency below 4.
[arvados.git] / sdk / go / arvados / limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "context"
9         "errors"
10         "net/http"
11         "net/url"
12         "sync"
13         "time"
14 )
15
16 var (
17         requestLimiterQuietPeriod        = time.Second
18         requestLimiterInitialLimit int64 = 8
19         requestLimiterMinimumLimit int64 = 4
20 )
21
22 type requestLimiter struct {
23         current    int64
24         limit      int64
25         maxlimit   int64
26         lock       sync.Mutex
27         cond       *sync.Cond
28         quietUntil time.Time
29 }
30
31 // Acquire reserves one request slot, waiting if necessary.
32 //
33 // Acquire returns early if ctx cancels before a slot is available. It
34 // is assumed in this case the caller will immediately notice
35 // ctx.Err() != nil and call Release().
36 func (rl *requestLimiter) Acquire(ctx context.Context) {
37         rl.lock.Lock()
38         if rl.cond == nil {
39                 // First use of requestLimiter. Initialize.
40                 rl.cond = sync.NewCond(&rl.lock)
41                 rl.limit = requestLimiterInitialLimit
42         }
43         // Wait out the quiet period(s) immediately following a 503.
44         for ctx.Err() == nil {
45                 delay := rl.quietUntil.Sub(time.Now())
46                 if delay < 0 {
47                         break
48                 }
49                 // Wait for the end of the quiet period, which started
50                 // when we last received a 503 response.
51                 rl.lock.Unlock()
52                 timer := time.NewTimer(delay)
53                 select {
54                 case <-timer.C:
55                 case <-ctx.Done():
56                         timer.Stop()
57                 }
58                 rl.lock.Lock()
59         }
60         ready := make(chan struct{})
61         go func() {
62                 // close ready when a slot is available _or_ we wake
63                 // up and find ctx has been canceled (meaning Acquire
64                 // has already returned, or is about to).
65                 for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
66                         rl.cond.Wait()
67                 }
68                 close(ready)
69         }()
70         select {
71         case <-ready:
72                 // Wait() returned, so we have the lock.
73                 rl.current++
74                 rl.lock.Unlock()
75         case <-ctx.Done():
76                 // When Wait() returns the lock to our goroutine
77                 // (which might have already happened) we need to
78                 // release it (if we don't do this now, the following
79                 // Lock() can deadlock).
80                 go func() {
81                         <-ready
82                         rl.lock.Unlock()
83                 }()
84                 // Note we may have current > limit until the caller
85                 // calls Release().
86                 rl.lock.Lock()
87                 rl.current++
88                 rl.lock.Unlock()
89         }
90 }
91
92 // Release releases a slot that has been reserved with Acquire.
93 func (rl *requestLimiter) Release() {
94         rl.lock.Lock()
95         rl.current--
96         rl.lock.Unlock()
97         rl.cond.Signal()
98 }
99
100 // Report uses the return values from (*http.Client)Do() to adjust the
101 // outgoing request limit (increase on success, decrease on 503).
102 //
103 // Return value is true if the response was a 503.
104 func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
105         rl.lock.Lock()
106         defer rl.lock.Unlock()
107         is503 := false
108         if err != nil {
109                 uerr := &url.Error{}
110                 if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
111                         // This is how http.Client reports 503 from proxy server
112                         is503 = true
113                 } else {
114                         return false
115                 }
116         } else {
117                 is503 = resp.StatusCode == http.StatusServiceUnavailable
118         }
119         if is503 {
120                 if rl.limit == 0 {
121                         // Concurrency was unlimited until now.
122                         // Calculate new limit based on actual
123                         // concurrency instead of previous limit.
124                         rl.limit = rl.current
125                 }
126                 if time.Now().After(rl.quietUntil) {
127                         // Reduce concurrency limit by half.
128                         rl.limit = (rl.limit + 1) / 2
129                         // Don't start any new calls (or reduce the
130                         // limit even further on additional 503s) for
131                         // a second.
132                         rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
133                 }
134                 return true
135         }
136         if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
137                 // After each non-server-error response, increase
138                 // concurrency limit by at least 10% -- but not beyond
139                 // 2x the highest concurrency level we've seen without
140                 // a failure.
141                 increase := rl.limit / 10
142                 if increase < 1 {
143                         increase = 1
144                 }
145                 rl.limit += increase
146                 if max := rl.current * 2; max < rl.limit {
147                         rl.limit = max
148                 }
149                 if min := requestLimiterMinimumLimit; min > rl.limit {
150                         // If limit is too low, programs like
151                         // controller and test suites can end up with
152                         // too few slots to complete a single request.
153                         rl.limit = min
154                 }
155                 if rl.maxlimit > 0 && rl.maxlimit < rl.limit {
156                         rl.limit = rl.maxlimit
157                 }
158                 rl.cond.Broadcast()
159         }
160         return false
161 }