21317: increased checkbox hitbox size Arvados-DCO-1.1-Signed-off-by: Lisa Knox <lisa...
[arvados.git] / sdk / go / httpserver / request_limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "container/heap"
9         "math"
10         "net/http"
11         "sync"
12         "time"
13
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 const MinPriority = math.MinInt64
19
20 // Prometheus typically polls every 10 seconds, but it doesn't cost us
21 // much to also accommodate higher frequency collection by updating
22 // internal stats more frequently. (This limits time resolution only
23 // for the metrics that aren't generated on the fly.)
24 const metricsUpdateInterval = time.Second
25
26 // RequestLimiter wraps http.Handler, limiting the number of
27 // concurrent requests being handled by the wrapped Handler. Requests
28 // that arrive when the handler is already at the specified
29 // concurrency limit are queued and handled in the order indicated by
30 // the Priority function.
31 //
32 // Caller must not modify any RequestLimiter fields after calling its
33 // methods.
34 type RequestLimiter struct {
35         Handler http.Handler
36
37         // Maximum number of requests being handled at once. Beyond
38         // this limit, requests will be queued.
39         MaxConcurrent int
40
41         // Maximum number of requests in the queue. Beyond this limit,
42         // the lowest priority requests will return 503.
43         MaxQueue int
44
45         // Priority determines queue ordering. Requests with higher
46         // priority are handled first. Requests with equal priority
47         // are handled FIFO. If Priority is nil, all requests are
48         // handled FIFO.
49         Priority func(req *http.Request, queued time.Time) int64
50
51         // Return 503 for any request for which Priority() returns
52         // MinPriority if it spends longer than this in the queue
53         // before starting processing.
54         MaxQueueTimeForMinPriority time.Duration
55
56         // "concurrent_requests", "max_concurrent_requests",
57         // "queued_requests", and "max_queued_requests" metrics are
58         // registered with Registry, if it is not nil.
59         Registry *prometheus.Registry
60
61         setupOnce     sync.Once
62         mQueueDelay   *prometheus.SummaryVec
63         mQueueTimeout *prometheus.SummaryVec
64         mQueueUsage   *prometheus.GaugeVec
65         mtx           sync.Mutex
66         handling      int
67         queue         queue
68 }
69
70 type qent struct {
71         queued   time.Time
72         priority int64
73         heappos  int
74         ready    chan bool // true = handle now; false = return 503 now
75 }
76
77 type queue []*qent
78
79 func (h queue) Swap(i, j int) {
80         h[i], h[j] = h[j], h[i]
81         h[i].heappos, h[j].heappos = i, j
82 }
83
84 func (h queue) Less(i, j int) bool {
85         pi, pj := h[i].priority, h[j].priority
86         return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
87 }
88
89 func (h queue) Len() int {
90         return len(h)
91 }
92
93 func (h *queue) Push(x interface{}) {
94         n := len(*h)
95         ent := x.(*qent)
96         ent.heappos = n
97         *h = append(*h, ent)
98 }
99
100 func (h *queue) Pop() interface{} {
101         n := len(*h)
102         ent := (*h)[n-1]
103         ent.heappos = -1
104         (*h)[n-1] = nil
105         *h = (*h)[0 : n-1]
106         return ent
107 }
108
109 func (h *queue) add(ent *qent) {
110         ent.heappos = h.Len()
111         h.Push(ent)
112 }
113
114 func (h *queue) removeMax() *qent {
115         return heap.Pop(h).(*qent)
116 }
117
118 func (h *queue) remove(i int) {
119         heap.Remove(h, i)
120 }
121
122 func (rl *RequestLimiter) setup() {
123         if rl.Registry != nil {
124                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
125                         prometheus.GaugeOpts{
126                                 Namespace: "arvados",
127                                 Name:      "concurrent_requests",
128                                 Help:      "Number of requests in progress",
129                         },
130                         func() float64 {
131                                 rl.mtx.Lock()
132                                 defer rl.mtx.Unlock()
133                                 return float64(rl.handling)
134                         },
135                 ))
136                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
137                         prometheus.GaugeOpts{
138                                 Namespace: "arvados",
139                                 Name:      "max_concurrent_requests",
140                                 Help:      "Maximum number of concurrent requests",
141                         },
142                         func() float64 { return float64(rl.MaxConcurrent) },
143                 ))
144                 rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
145                         Namespace: "arvados",
146                         Name:      "queued_requests",
147                         Help:      "Number of requests in queue",
148                 }, []string{"priority"})
149                 rl.Registry.MustRegister(rl.mQueueUsage)
150                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
151                         prometheus.GaugeOpts{
152                                 Namespace: "arvados",
153                                 Name:      "max_queued_requests",
154                                 Help:      "Maximum number of queued requests",
155                         },
156                         func() float64 { return float64(rl.MaxQueue) },
157                 ))
158                 rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
159                         Namespace:  "arvados",
160                         Name:       "queue_delay_seconds",
161                         Help:       "Time spent in the incoming request queue before start of processing",
162                         Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
163                 }, []string{"priority"})
164                 rl.Registry.MustRegister(rl.mQueueDelay)
165                 rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
166                         Namespace:  "arvados",
167                         Name:       "queue_timeout_seconds",
168                         Help:       "Time spent in the incoming request queue before client timed out or disconnected",
169                         Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
170                 }, []string{"priority"})
171                 rl.Registry.MustRegister(rl.mQueueTimeout)
172                 go func() {
173                         for range time.NewTicker(metricsUpdateInterval).C {
174                                 var low, normal, high int
175                                 rl.mtx.Lock()
176                                 for _, ent := range rl.queue {
177                                         switch {
178                                         case ent.priority < 0:
179                                                 low++
180                                         case ent.priority > 0:
181                                                 high++
182                                         default:
183                                                 normal++
184                                         }
185                                 }
186                                 rl.mtx.Unlock()
187                                 rl.mQueueUsage.WithLabelValues("low").Set(float64(low))
188                                 rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal))
189                                 rl.mQueueUsage.WithLabelValues("high").Set(float64(high))
190                         }
191                 }()
192         }
193 }
194
195 // caller must have lock
196 func (rl *RequestLimiter) runqueue() {
197         // Handle entries from the queue as capacity permits
198         for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
199                 rl.handling++
200                 ent := rl.queue.removeMax()
201                 ent.ready <- true
202         }
203 }
204
205 // If the queue is too full, fail and remove the lowest-priority
206 // entry. Caller must have lock. Queue must not be empty.
207 func (rl *RequestLimiter) trimqueue() {
208         if len(rl.queue) <= rl.MaxQueue {
209                 return
210         }
211         min := 0
212         for i := range rl.queue {
213                 if i == 0 || rl.queue.Less(min, i) {
214                         min = i
215                 }
216         }
217         rl.queue[min].ready <- false
218         rl.queue.remove(min)
219 }
220
221 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
222         rl.mtx.Lock()
223         defer rl.mtx.Unlock()
224         qtime := time.Now()
225         var priority int64
226         if rl.Priority != nil {
227                 priority = rl.Priority(req, qtime)
228         }
229         ent := &qent{
230                 queued:   qtime,
231                 priority: priority,
232                 ready:    make(chan bool, 1),
233                 heappos:  -1,
234         }
235         if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
236                 // fast path, skip the queue
237                 rl.handling++
238                 ent.ready <- true
239                 return ent
240         }
241         rl.queue.add(ent)
242         rl.trimqueue()
243         return ent
244 }
245
246 func (rl *RequestLimiter) remove(ent *qent) {
247         rl.mtx.Lock()
248         defer rl.mtx.Unlock()
249         if ent.heappos >= 0 {
250                 rl.queue.remove(ent.heappos)
251                 ent.ready <- false
252         }
253 }
254
255 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
256         rl.setupOnce.Do(rl.setup)
257         ent := rl.enqueue(req)
258         SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
259         if ent.priority == MinPriority {
260                 // Note that MaxQueueTime==0 does not cancel a req
261                 // that skips the queue, because in that case
262                 // rl.enqueue() has already fired ready<-true and
263                 // rl.remove() is a no-op.
264                 go func() {
265                         time.Sleep(rl.MaxQueueTimeForMinPriority)
266                         rl.remove(ent)
267                 }()
268         }
269         var ok bool
270         select {
271         case <-req.Context().Done():
272                 rl.remove(ent)
273                 // we still need to wait for ent.ready, because
274                 // sometimes runqueue() will have already decided to
275                 // send true before our rl.remove() call, and in that
276                 // case we'll need to decrement rl.handling below.
277                 ok = <-ent.ready
278         case ok = <-ent.ready:
279         }
280
281         // Report time spent in queue in the appropriate bucket:
282         // mQueueDelay if the request actually got processed,
283         // mQueueTimeout if it was abandoned or cancelled before
284         // getting a processing slot.
285         var series *prometheus.SummaryVec
286         if ok {
287                 series = rl.mQueueDelay
288         } else {
289                 series = rl.mQueueTimeout
290         }
291         if series != nil {
292                 var qlabel string
293                 switch {
294                 case ent.priority < 0:
295                         qlabel = "low"
296                 case ent.priority > 0:
297                         qlabel = "high"
298                 default:
299                         qlabel = "normal"
300                 }
301                 series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
302         }
303
304         if !ok {
305                 resp.WriteHeader(http.StatusServiceUnavailable)
306                 return
307         }
308         defer func() {
309                 rl.mtx.Lock()
310                 defer rl.mtx.Unlock()
311                 rl.handling--
312                 // unblock the next waiting request
313                 rl.runqueue()
314         }()
315         rl.Handler.ServeHTTP(resp, req)
316 }