21126: Merge branch 'main' into 21126-trash-when-ro
[arvados.git] / sdk / go / arvados / limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "context"
9         "errors"
10         "net/http"
11         "net/url"
12         "sync"
13         "time"
14 )
15
16 var (
17         requestLimiterQuietPeriod        = time.Second
18         requestLimiterInitialLimit int64 = 8
19 )
20
21 type requestLimiter struct {
22         current    int64
23         limit      int64
24         maxlimit   int64
25         lock       sync.Mutex
26         cond       *sync.Cond
27         quietUntil time.Time
28 }
29
30 // Acquire reserves one request slot, waiting if necessary.
31 //
32 // Acquire returns early if ctx cancels before a slot is available. It
33 // is assumed in this case the caller will immediately notice
34 // ctx.Err() != nil and call Release().
35 func (rl *requestLimiter) Acquire(ctx context.Context) {
36         rl.lock.Lock()
37         if rl.cond == nil {
38                 // First use of requestLimiter. Initialize.
39                 rl.cond = sync.NewCond(&rl.lock)
40                 rl.limit = requestLimiterInitialLimit
41         }
42         // Wait out the quiet period(s) immediately following a 503.
43         for ctx.Err() == nil {
44                 delay := rl.quietUntil.Sub(time.Now())
45                 if delay < 0 {
46                         break
47                 }
48                 // Wait for the end of the quiet period, which started
49                 // when we last received a 503 response.
50                 rl.lock.Unlock()
51                 timer := time.NewTimer(delay)
52                 select {
53                 case <-timer.C:
54                 case <-ctx.Done():
55                         timer.Stop()
56                 }
57                 rl.lock.Lock()
58         }
59         ready := make(chan struct{})
60         go func() {
61                 // close ready when a slot is available _or_ we wake
62                 // up and find ctx has been canceled (meaning Acquire
63                 // has already returned, or is about to).
64                 for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
65                         rl.cond.Wait()
66                 }
67                 close(ready)
68         }()
69         select {
70         case <-ready:
71                 // Wait() returned, so we have the lock.
72                 rl.current++
73                 rl.lock.Unlock()
74         case <-ctx.Done():
75                 // When Wait() returns the lock to our goroutine
76                 // (which might have already happened) we need to
77                 // release it (if we don't do this now, the following
78                 // Lock() can deadlock).
79                 go func() {
80                         <-ready
81                         rl.lock.Unlock()
82                 }()
83                 // Note we may have current > limit until the caller
84                 // calls Release().
85                 rl.lock.Lock()
86                 rl.current++
87                 rl.lock.Unlock()
88         }
89 }
90
91 // Release releases a slot that has been reserved with Acquire.
92 func (rl *requestLimiter) Release() {
93         rl.lock.Lock()
94         rl.current--
95         rl.lock.Unlock()
96         rl.cond.Signal()
97 }
98
99 // Report uses the return values from (*http.Client)Do() to adjust the
100 // outgoing request limit (increase on success, decrease on 503).
101 //
102 // Return value is true if the response was a 503.
103 func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
104         rl.lock.Lock()
105         defer rl.lock.Unlock()
106         is503 := false
107         if err != nil {
108                 uerr := &url.Error{}
109                 if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
110                         // This is how http.Client reports 503 from proxy server
111                         is503 = true
112                 } else {
113                         return false
114                 }
115         } else {
116                 is503 = resp.StatusCode == http.StatusServiceUnavailable
117         }
118         if is503 {
119                 if rl.limit == 0 {
120                         // Concurrency was unlimited until now.
121                         // Calculate new limit based on actual
122                         // concurrency instead of previous limit.
123                         rl.limit = rl.current
124                 }
125                 if time.Now().After(rl.quietUntil) {
126                         // Reduce concurrency limit by half.
127                         rl.limit = (rl.limit + 1) / 2
128                         // Don't start any new calls (or reduce the
129                         // limit even further on additional 503s) for
130                         // a second.
131                         rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
132                 }
133                 return true
134         }
135         if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
136                 // After each non-server-error response, increase
137                 // concurrency limit by at least 10% -- but not beyond
138                 // 2x the highest concurrency level we've seen without
139                 // a failure.
140                 increase := rl.limit / 10
141                 if increase < 1 {
142                         increase = 1
143                 }
144                 rl.limit += increase
145                 if max := rl.current * 2; max < rl.limit {
146                         rl.limit = max
147                 }
148                 if rl.maxlimit > 0 && rl.maxlimit < rl.limit {
149                         rl.limit = rl.maxlimit
150                 }
151                 rl.cond.Broadcast()
152         }
153         return false
154 }