20602: Add priority queue to RequestLimiter.
[arvados.git] / sdk / go / httpserver / request_limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "net/http"
9         "sync"
10         "time"
11
12         "github.com/prometheus/client_golang/prometheus"
13 )
14
15 // RequestLimiter wraps http.Handler, limiting the number of
16 // concurrent requests being handled by the wrapped Handler. Requests
17 // that arrive when the handler is already at the specified
18 // concurrency limit are queued and handled in the order indicated by
19 // the Priority function.
20 //
21 // Caller must not modify any RequestLimiter fields after calling its
22 // methods.
23 type RequestLimiter struct {
24         Handler http.Handler
25
26         // Maximum number of requests being handled at once. Beyond
27         // this limit, requests will be queued.
28         MaxConcurrent int
29
30         // Maximum number of requests in the queue. Beyond this limit,
31         // the lowest priority requests will return 503.
32         MaxQueue int
33
34         // Priority determines queue ordering. Requests with higher
35         // priority are handled first. Requests with equal priority
36         // are handled FIFO. If Priority is nil, all requests are
37         // handled FIFO.
38         Priority func(req *http.Request, queued time.Time) int64
39
40         // "concurrent_requests", "max_concurrent_requests",
41         // "queued_requests", and "max_queued_requests" metrics are
42         // registered with Registry, if it is not nil.
43         Registry *prometheus.Registry
44
45         setupOnce sync.Once
46         mtx       sync.Mutex
47         handling  int
48         queue     heap
49 }
50
51 type qent struct {
52         queued   time.Time
53         priority int64
54         heappos  int
55         ready    chan bool // true = handle now; false = return 503 now
56 }
57
58 type heap []*qent
59
60 func (h heap) Swap(i, j int) {
61         h[i], h[j] = h[j], h[i]
62         h[i].heappos, h[j].heappos = i, j
63 }
64
65 func (h heap) Less(i, j int) bool {
66         pi, pj := h[i].priority, h[j].priority
67         return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
68 }
69
70 func (h heap) Len() int {
71         return len(h)
72 }
73
74 // Move element i to a correct position in the heap. When the heap is
75 // empty, fix(0) is a no-op (does not crash).
76 func (h heap) fix(i int) {
77         // If the initial position is a leaf (i.e., index is greater
78         // than the last node's parent index), we only need to move it
79         // up, not down.
80         uponly := i > (len(h)-2)/2
81         // Move the new entry toward the root until reaching a
82         // position where the parent already has higher priority.
83         for i > 0 {
84                 parent := (i - 1) / 2
85                 if h.Less(i, parent) {
86                         h.Swap(i, parent)
87                         i = parent
88                 } else {
89                         break
90                 }
91         }
92         // Move i away from the root until reaching a position where
93         // both children already have lower priority.
94         for !uponly {
95                 child := i*2 + 1
96                 if child+1 < len(h) && h.Less(child+1, child) {
97                         // Right child has higher priority than left
98                         // child. Choose right child.
99                         child = child + 1
100                 }
101                 if child < len(h) && h.Less(child, i) {
102                         // Chosen child has higher priority than i.
103                         // Swap and continue down.
104                         h.Swap(i, child)
105                         i = child
106                 } else {
107                         break
108                 }
109         }
110 }
111
112 func (h *heap) add(ent *qent) {
113         ent.heappos = len(*h)
114         *h = append(*h, ent)
115         h.fix(ent.heappos)
116 }
117
118 func (h *heap) removeMax() *qent {
119         ent := (*h)[0]
120         if len(*h) == 1 {
121                 *h = (*h)[:0]
122         } else {
123                 h.Swap(0, len(*h)-1)
124                 *h = (*h)[:len(*h)-1]
125                 h.fix(0)
126         }
127         ent.heappos = -1
128         return ent
129 }
130
131 func (h *heap) remove(i int) {
132         // Move the last leaf into i's place, then move it to a
133         // correct position.
134         h.Swap(i, len(*h)-1)
135         *h = (*h)[:len(*h)-1]
136         if i < len(*h) {
137                 h.fix(i)
138         }
139 }
140
141 func (rl *RequestLimiter) setup() {
142         if rl.Registry != nil {
143                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
144                         prometheus.GaugeOpts{
145                                 Namespace: "arvados",
146                                 Name:      "concurrent_requests",
147                                 Help:      "Number of requests in progress",
148                         },
149                         func() float64 {
150                                 rl.mtx.Lock()
151                                 defer rl.mtx.Unlock()
152                                 return float64(rl.handling)
153                         },
154                 ))
155                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
156                         prometheus.GaugeOpts{
157                                 Namespace: "arvados",
158                                 Name:      "max_concurrent_requests",
159                                 Help:      "Maximum number of concurrent requests",
160                         },
161                         func() float64 { return float64(rl.MaxConcurrent) },
162                 ))
163                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
164                         prometheus.GaugeOpts{
165                                 Namespace: "arvados",
166                                 Name:      "queued_requests",
167                                 Help:      "Number of requests in queue",
168                         },
169                         func() float64 {
170                                 rl.mtx.Lock()
171                                 defer rl.mtx.Unlock()
172                                 return float64(len(rl.queue))
173                         },
174                 ))
175                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
176                         prometheus.GaugeOpts{
177                                 Namespace: "arvados",
178                                 Name:      "max_queued_requests",
179                                 Help:      "Maximum number of queued requests",
180                         },
181                         func() float64 { return float64(rl.MaxQueue) },
182                 ))
183         }
184 }
185
186 // caller must have lock
187 func (rl *RequestLimiter) runqueue() {
188         // Handle entries from the queue as capacity permits
189         for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
190                 rl.handling++
191                 ent := rl.queue.removeMax()
192                 ent.heappos = -1
193                 ent.ready <- true
194         }
195 }
196
197 // If the queue is too full, fail and remove the lowest-priority
198 // entry. Caller must have lock. Queue must not be empty.
199 func (rl *RequestLimiter) trimqueue() {
200         if len(rl.queue) <= rl.MaxQueue {
201                 return
202         }
203         min := 0
204         for i := range rl.queue {
205                 if i == 0 || rl.queue.Less(min, i) {
206                         min = i
207                 }
208         }
209         rl.queue[min].heappos = -1
210         rl.queue[min].ready <- false
211         rl.queue.remove(min)
212 }
213
214 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
215         rl.mtx.Lock()
216         defer rl.mtx.Unlock()
217         qtime := time.Now()
218         var priority int64
219         if rl.Priority != nil {
220                 priority = rl.Priority(req, qtime)
221         }
222         ent := &qent{
223                 queued:   qtime,
224                 priority: priority,
225                 ready:    make(chan bool, 1),
226                 heappos:  -1,
227         }
228         if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
229                 // fast path, skip the queue
230                 rl.handling++
231                 ent.ready <- true
232                 return ent
233         }
234         rl.queue.add(ent)
235         rl.trimqueue()
236         return ent
237 }
238
239 func (rl *RequestLimiter) remove(ent *qent) {
240         rl.mtx.Lock()
241         defer rl.mtx.Unlock()
242         if ent.heappos >= 0 {
243                 rl.queue.remove(ent.heappos)
244                 ent.heappos = -1
245                 ent.ready <- false
246         }
247 }
248
249 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
250         rl.setupOnce.Do(rl.setup)
251         ent := rl.enqueue(req)
252         var ok bool
253         select {
254         case <-req.Context().Done():
255                 rl.remove(ent)
256                 // we still need to wait for ent.ready, because
257                 // sometimes runqueue() will have already decided to
258                 // send true before our rl.remove() call, and in that
259                 // case we'll need to decrement rl.handling below.
260                 ok = <-ent.ready
261         case ok = <-ent.ready:
262         }
263         if !ok {
264                 resp.WriteHeader(http.StatusServiceUnavailable)
265                 return
266         }
267         defer func() {
268                 rl.mtx.Lock()
269                 defer rl.mtx.Unlock()
270                 rl.handling--
271                 // unblock the next waiting request
272                 rl.runqueue()
273         }()
274         rl.Handler.ServeHTTP(resp, req)
275 }