20259: Add documentation for banner and tooltip features
[arvados.git] / sdk / go / arvados / limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "context"
9         "errors"
10         "net/http"
11         "net/url"
12         "sync"
13         "time"
14 )
15
16 var requestLimiterQuietPeriod = time.Second
17
18 type requestLimiter struct {
19         current    int64
20         limit      int64
21         lock       sync.Mutex
22         cond       *sync.Cond
23         quietUntil time.Time
24 }
25
26 // Acquire reserves one request slot, waiting if necessary.
27 //
28 // Acquire returns early if ctx cancels before a slot is available. It
29 // is assumed in this case the caller will immediately notice
30 // ctx.Err() != nil and call Release().
31 func (rl *requestLimiter) Acquire(ctx context.Context) {
32         rl.lock.Lock()
33         if rl.cond == nil {
34                 // First use of requestLimiter. Initialize.
35                 rl.cond = sync.NewCond(&rl.lock)
36         }
37         // Wait out the quiet period(s) immediately following a 503.
38         for ctx.Err() == nil {
39                 delay := rl.quietUntil.Sub(time.Now())
40                 if delay < 0 {
41                         break
42                 }
43                 // Wait for the end of the quiet period, which started
44                 // when we last received a 503 response.
45                 rl.lock.Unlock()
46                 timer := time.NewTimer(delay)
47                 select {
48                 case <-timer.C:
49                 case <-ctx.Done():
50                         timer.Stop()
51                 }
52                 rl.lock.Lock()
53         }
54         ready := make(chan struct{})
55         go func() {
56                 // close ready when a slot is available _or_ we wake
57                 // up and find ctx has been canceled (meaning Acquire
58                 // has already returned, or is about to).
59                 for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
60                         rl.cond.Wait()
61                 }
62                 close(ready)
63         }()
64         select {
65         case <-ready:
66                 // Wait() returned, so we have the lock.
67                 rl.current++
68                 rl.lock.Unlock()
69         case <-ctx.Done():
70                 // When Wait() returns the lock to our goroutine
71                 // (which might have already happened) we need to
72                 // release it (if we don't do this now, the following
73                 // Lock() can deadlock).
74                 go func() {
75                         <-ready
76                         rl.lock.Unlock()
77                 }()
78                 // Note we may have current > limit until the caller
79                 // calls Release().
80                 rl.lock.Lock()
81                 rl.current++
82                 rl.lock.Unlock()
83         }
84 }
85
86 // Release releases a slot that has been reserved with Acquire.
87 func (rl *requestLimiter) Release() {
88         rl.lock.Lock()
89         rl.current--
90         rl.lock.Unlock()
91         rl.cond.Signal()
92 }
93
94 // Report uses the return values from (*http.Client)Do() to adjust the
95 // outgoing request limit (increase on success, decrease on 503).
96 //
97 // Return value is true if the response was a 503.
98 func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
99         rl.lock.Lock()
100         defer rl.lock.Unlock()
101         is503 := false
102         if err != nil {
103                 uerr := &url.Error{}
104                 if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
105                         // This is how http.Client reports 503 from proxy server
106                         is503 = true
107                 } else {
108                         return false
109                 }
110         } else {
111                 is503 = resp.StatusCode == http.StatusServiceUnavailable
112         }
113         if is503 {
114                 if rl.limit == 0 {
115                         // Concurrency was unlimited until now.
116                         // Calculate new limit based on actual
117                         // concurrency instead of previous limit.
118                         rl.limit = rl.current
119                 }
120                 if time.Now().After(rl.quietUntil) {
121                         // Reduce concurrency limit by half.
122                         rl.limit = (rl.limit + 1) / 2
123                         // Don't start any new calls (or reduce the
124                         // limit even further on additional 503s) for
125                         // a second.
126                         rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
127                 }
128                 return true
129         }
130         if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
131                 // After each non-server-error response, increase
132                 // concurrency limit by at least 10% -- but not beyond
133                 // 2x the highest concurrency level we've seen without
134                 // a failure.
135                 increase := rl.limit / 10
136                 if increase < 1 {
137                         increase = 1
138                 }
139                 rl.limit += increase
140                 if max := rl.current * 2; max > rl.limit {
141                         rl.limit = max
142                 }
143                 rl.cond.Broadcast()
144         }
145         return false
146 }