19973: Limit arvados.Client request concurrency.
[arvados.git] / sdk / go / arvados / limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "context"
9         "net/http"
10         "sync"
11         "time"
12 )
13
14 var requestLimiterQuietPeriod = time.Second
15
16 type requestLimiter struct {
17         current    int64
18         limit      int64
19         lock       sync.Mutex
20         cond       *sync.Cond
21         quietUntil time.Time
22 }
23
24 // Acquire reserves one request slot, waiting if necessary.
25 //
26 // Acquire returns early if ctx cancels before a slot is available. It
27 // is assumed in this case the caller will immediately notice
28 // ctx.Err() != nil and call Release().
29 func (rl *requestLimiter) Acquire(ctx context.Context) {
30         rl.lock.Lock()
31         if rl.cond == nil {
32                 // First use of requestLimiter. Initialize.
33                 rl.cond = sync.NewCond(&rl.lock)
34         }
35         // Wait out the quiet period(s) immediately following a 503.
36         for ctx.Err() == nil {
37                 delay := rl.quietUntil.Sub(time.Now())
38                 if delay < 0 {
39                         break
40                 }
41                 // Wait for the end of the quiet period, which started
42                 // when we last received a 503 response.
43                 rl.lock.Unlock()
44                 timer := time.NewTimer(delay)
45                 select {
46                 case <-timer.C:
47                 case <-ctx.Done():
48                         timer.Stop()
49                 }
50                 rl.lock.Lock()
51         }
52         ready := make(chan struct{})
53         go func() {
54                 // close ready when a slot is available _or_ we wake
55                 // up and find ctx has been canceled (meaning Acquire
56                 // has already returned, or is about to).
57                 for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
58                         rl.cond.Wait()
59                 }
60                 close(ready)
61         }()
62         select {
63         case <-ready:
64                 // Wait() returned, so we have the lock.
65                 rl.current++
66                 rl.lock.Unlock()
67         case <-ctx.Done():
68                 // When Wait() returns the lock to our goroutine
69                 // (which might have already happened) we need to
70                 // release it (if we don't do this now, the following
71                 // Lock() can deadlock).
72                 go func() {
73                         <-ready
74                         rl.lock.Unlock()
75                 }()
76                 // Note we may have current > limit until the caller
77                 // calls Release().
78                 rl.lock.Lock()
79                 rl.current++
80                 rl.lock.Unlock()
81         }
82 }
83
84 // Release releases a slot that has been reserved with Acquire.
85 func (rl *requestLimiter) Release() {
86         rl.lock.Lock()
87         rl.current--
88         rl.lock.Unlock()
89         rl.cond.Signal()
90 }
91
92 // Report uses the return values from (*http.Client)Do() to adjust the
93 // outgoing request limit (increase on success, decrease on 503).
94 func (rl *requestLimiter) Report(resp *http.Response, err error) {
95         if err != nil {
96                 return
97         }
98         rl.lock.Lock()
99         defer rl.lock.Unlock()
100         if resp.StatusCode == http.StatusServiceUnavailable {
101                 if rl.limit == 0 {
102                         // Concurrency was unlimited until now.
103                         // Calculate new limit based on actual
104                         // concurrency instead of previous limit.
105                         rl.limit = rl.current
106                 }
107                 if time.Now().After(rl.quietUntil) {
108                         // Reduce concurrency limit by half.
109                         rl.limit = (rl.limit + 1) / 2
110                         // Don't start any new calls (or reduce the
111                         // limit even further on additional 503s) for
112                         // a second.
113                         rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
114                 }
115         } else if resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
116                 // After each non-server-error response, increase
117                 // concurrency limit by at least 10% -- but not beyond
118                 // 2x the highest concurrency level we've seen without
119                 // a failure.
120                 increase := rl.limit / 10
121                 if increase < 1 {
122                         increase = 1
123                 }
124                 rl.limit += increase
125                 if max := rl.current * 2; max > rl.limit {
126                         rl.limit = max
127                 }
128                 rl.cond.Broadcast()
129         }
130 }