21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / go / httpserver / request_limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "container/heap"
9         "math"
10         "net/http"
11         "sync"
12         "time"
13
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 const MinPriority = math.MinInt64
19
20 // Prometheus typically polls every 10 seconds, but it doesn't cost us
21 // much to also accommodate higher frequency collection by updating
22 // internal stats more frequently. (This limits time resolution only
23 // for the metrics that aren't generated on the fly.)
24 const metricsUpdateInterval = time.Second
25
26 // RequestLimiter wraps http.Handler, limiting the number of
27 // concurrent requests being handled by the wrapped Handler. Requests
28 // that arrive when the handler is already at the specified
29 // concurrency limit are queued and handled in the order indicated by
30 // the Priority function.
31 //
32 // Caller must not modify any RequestLimiter fields after calling its
33 // methods.
34 type RequestLimiter struct {
35         Handler http.Handler
36
37         // Queue determines which queue a request is assigned to.
38         Queue func(req *http.Request) *RequestQueue
39
40         // Priority determines queue ordering. Requests with higher
41         // priority are handled first. Requests with equal priority
42         // are handled FIFO. If Priority is nil, all requests are
43         // handled FIFO.
44         Priority func(req *http.Request, queued time.Time) int64
45
46         // "concurrent_requests", "max_concurrent_requests",
47         // "queued_requests", and "max_queued_requests" metrics are
48         // registered with Registry, if it is not nil.
49         Registry *prometheus.Registry
50
51         setupOnce     sync.Once
52         mQueueDelay   *prometheus.SummaryVec
53         mQueueTimeout *prometheus.SummaryVec
54         mQueueUsage   *prometheus.GaugeVec
55         mtx           sync.Mutex
56         rqs           map[*RequestQueue]bool // all RequestQueues in use
57 }
58
59 type RequestQueue struct {
60         // Label for metrics. No two queues should have the same label.
61         Label string
62
63         // Maximum number of requests being handled at once. Beyond
64         // this limit, requests will be queued.
65         MaxConcurrent int
66
67         // Maximum number of requests in the queue. Beyond this limit,
68         // the lowest priority requests will return 503.
69         MaxQueue int
70
71         // Return 503 for any request for which Priority() returns
72         // MinPriority if it spends longer than this in the queue
73         // before starting processing.
74         MaxQueueTimeForMinPriority time.Duration
75
76         queue    queue
77         handling int
78 }
79
80 type qent struct {
81         rq       *RequestQueue
82         queued   time.Time
83         priority int64
84         heappos  int
85         ready    chan bool // true = handle now; false = return 503 now
86 }
87
88 type queue []*qent
89
90 func (h queue) Swap(i, j int) {
91         h[i], h[j] = h[j], h[i]
92         h[i].heappos, h[j].heappos = i, j
93 }
94
95 func (h queue) Less(i, j int) bool {
96         pi, pj := h[i].priority, h[j].priority
97         return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
98 }
99
100 func (h queue) Len() int {
101         return len(h)
102 }
103
104 func (h *queue) Push(x interface{}) {
105         n := len(*h)
106         ent := x.(*qent)
107         ent.heappos = n
108         *h = append(*h, ent)
109 }
110
111 func (h *queue) Pop() interface{} {
112         n := len(*h)
113         ent := (*h)[n-1]
114         ent.heappos = -1
115         (*h)[n-1] = nil
116         *h = (*h)[0 : n-1]
117         return ent
118 }
119
120 func (h *queue) add(ent *qent) {
121         ent.heappos = h.Len()
122         h.Push(ent)
123 }
124
125 func (h *queue) removeMax() *qent {
126         return heap.Pop(h).(*qent)
127 }
128
129 func (h *queue) remove(i int) {
130         heap.Remove(h, i)
131 }
132
133 func (rl *RequestLimiter) setup() {
134         if rl.Registry != nil {
135                 mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
136                         Namespace: "arvados",
137                         Name:      "concurrent_requests",
138                         Help:      "Number of requests in progress",
139                 }, []string{"queue"})
140                 rl.Registry.MustRegister(mCurrentReqs)
141                 mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
142                         Namespace: "arvados",
143                         Name:      "max_concurrent_requests",
144                         Help:      "Maximum number of concurrent requests",
145                 }, []string{"queue"})
146                 rl.Registry.MustRegister(mMaxReqs)
147                 mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{
148                         Namespace: "arvados",
149                         Name:      "max_queued_requests",
150                         Help:      "Maximum number of queued requests",
151                 }, []string{"queue"})
152                 rl.Registry.MustRegister(mMaxQueue)
153                 rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
154                         Namespace: "arvados",
155                         Name:      "queued_requests",
156                         Help:      "Number of requests in queue",
157                 }, []string{"queue", "priority"})
158                 rl.Registry.MustRegister(rl.mQueueUsage)
159                 rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
160                         Namespace:  "arvados",
161                         Name:       "queue_delay_seconds",
162                         Help:       "Time spent in the incoming request queue before start of processing",
163                         Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
164                 }, []string{"queue", "priority"})
165                 rl.Registry.MustRegister(rl.mQueueDelay)
166                 rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
167                         Namespace:  "arvados",
168                         Name:       "queue_timeout_seconds",
169                         Help:       "Time spent in the incoming request queue before client timed out or disconnected",
170                         Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
171                 }, []string{"queue", "priority"})
172                 rl.Registry.MustRegister(rl.mQueueTimeout)
173                 go func() {
174                         for range time.NewTicker(metricsUpdateInterval).C {
175                                 rl.mtx.Lock()
176                                 for rq := range rl.rqs {
177                                         var low, normal, high int
178                                         for _, ent := range rq.queue {
179                                                 switch {
180                                                 case ent.priority < 0:
181                                                         low++
182                                                 case ent.priority > 0:
183                                                         high++
184                                                 default:
185                                                         normal++
186                                                 }
187                                         }
188                                         mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling))
189                                         mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent))
190                                         mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue))
191                                         rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low))
192                                         rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal))
193                                         rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high))
194                                 }
195                                 rl.mtx.Unlock()
196                         }
197                 }()
198         }
199 }
200
201 // caller must have lock
202 func (rq *RequestQueue) runqueue() {
203         // Handle entries from the queue as capacity permits
204         for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) {
205                 rq.handling++
206                 ent := rq.queue.removeMax()
207                 ent.ready <- true
208         }
209 }
210
211 // If the queue is too full, fail and remove the lowest-priority
212 // entry. Caller must have lock. Queue must not be empty.
213 func (rq *RequestQueue) trimqueue() {
214         if len(rq.queue) <= rq.MaxQueue {
215                 return
216         }
217         min := 0
218         for i := range rq.queue {
219                 if i == 0 || rq.queue.Less(min, i) {
220                         min = i
221                 }
222         }
223         rq.queue[min].ready <- false
224         rq.queue.remove(min)
225 }
226
227 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
228         rl.mtx.Lock()
229         defer rl.mtx.Unlock()
230         qtime := time.Now()
231         var priority int64
232         if rl.Priority != nil {
233                 priority = rl.Priority(req, qtime)
234         }
235         ent := &qent{
236                 rq:       rl.Queue(req),
237                 queued:   qtime,
238                 priority: priority,
239                 ready:    make(chan bool, 1),
240                 heappos:  -1,
241         }
242         if rl.rqs == nil {
243                 rl.rqs = map[*RequestQueue]bool{}
244         }
245         rl.rqs[ent.rq] = true
246         if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling {
247                 // fast path, skip the queue
248                 ent.rq.handling++
249                 ent.ready <- true
250                 return ent
251         }
252         ent.rq.queue.add(ent)
253         ent.rq.trimqueue()
254         return ent
255 }
256
257 func (rl *RequestLimiter) remove(ent *qent) {
258         rl.mtx.Lock()
259         defer rl.mtx.Unlock()
260         if ent.heappos >= 0 {
261                 ent.rq.queue.remove(ent.heappos)
262                 ent.ready <- false
263         }
264 }
265
266 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
267         rl.setupOnce.Do(rl.setup)
268         ent := rl.enqueue(req)
269         SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label})
270         if ent.priority == MinPriority {
271                 // Note that MaxQueueTime==0 does not cancel a req
272                 // that skips the queue, because in that case
273                 // rl.enqueue() has already fired ready<-true and
274                 // rl.remove() is a no-op.
275                 go func() {
276                         time.Sleep(ent.rq.MaxQueueTimeForMinPriority)
277                         rl.remove(ent)
278                 }()
279         }
280         var ok bool
281         select {
282         case <-req.Context().Done():
283                 rl.remove(ent)
284                 // we still need to wait for ent.ready, because
285                 // sometimes runqueue() will have already decided to
286                 // send true before our rl.remove() call, and in that
287                 // case we'll need to decrement ent.rq.handling below.
288                 ok = <-ent.ready
289         case ok = <-ent.ready:
290         }
291
292         // Report time spent in queue in the appropriate bucket:
293         // mQueueDelay if the request actually got processed,
294         // mQueueTimeout if it was abandoned or cancelled before
295         // getting a processing slot.
296         var series *prometheus.SummaryVec
297         if ok {
298                 series = rl.mQueueDelay
299         } else {
300                 series = rl.mQueueTimeout
301         }
302         if series != nil {
303                 var qlabel string
304                 switch {
305                 case ent.priority < 0:
306                         qlabel = "low"
307                 case ent.priority > 0:
308                         qlabel = "high"
309                 default:
310                         qlabel = "normal"
311                 }
312                 series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
313         }
314
315         if !ok {
316                 resp.WriteHeader(http.StatusServiceUnavailable)
317                 return
318         }
319         defer func() {
320                 rl.mtx.Lock()
321                 defer rl.mtx.Unlock()
322                 ent.rq.handling--
323                 // unblock the next waiting request
324                 ent.rq.runqueue()
325         }()
326         rl.Handler.ServeHTTP(resp, req)
327 }