20602: Attach assigned priority to response log entry.
[arvados.git] / sdk / go / httpserver / request_limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "math"
9         "net/http"
10         "sync"
11         "time"
12
13         "github.com/prometheus/client_golang/prometheus"
14         "github.com/sirupsen/logrus"
15 )
16
17 // RequestLimiter wraps http.Handler, limiting the number of
18 // concurrent requests being handled by the wrapped Handler. Requests
19 // that arrive when the handler is already at the specified
20 // concurrency limit are queued and handled in the order indicated by
21 // the Priority function.
22 //
23 // Caller must not modify any RequestLimiter fields after calling its
24 // methods.
25 type RequestLimiter struct {
26         Handler http.Handler
27
28         // Maximum number of requests being handled at once. Beyond
29         // this limit, requests will be queued.
30         MaxConcurrent int
31
32         // Maximum number of requests in the queue. Beyond this limit,
33         // the lowest priority requests will return 503.
34         MaxQueue int
35
36         // Priority determines queue ordering. Requests with higher
37         // priority are handled first. Requests with equal priority
38         // are handled FIFO. If Priority is nil, all requests are
39         // handled FIFO.
40         Priority func(req *http.Request, queued time.Time) int64
41
42         // "concurrent_requests", "max_concurrent_requests",
43         // "queued_requests", and "max_queued_requests" metrics are
44         // registered with Registry, if it is not nil.
45         Registry *prometheus.Registry
46
47         setupOnce sync.Once
48         mtx       sync.Mutex
49         handling  int
50         queue     heap
51 }
52
53 type qent struct {
54         queued   time.Time
55         priority int64
56         heappos  int
57         ready    chan bool // true = handle now; false = return 503 now
58 }
59
60 type heap []*qent
61
62 func (h heap) Swap(i, j int) {
63         h[i], h[j] = h[j], h[i]
64         h[i].heappos, h[j].heappos = i, j
65 }
66
67 func (h heap) Less(i, j int) bool {
68         pi, pj := h[i].priority, h[j].priority
69         return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
70 }
71
72 func (h heap) Len() int {
73         return len(h)
74 }
75
76 // Move element i to a correct position in the heap. When the heap is
77 // empty, fix(0) is a no-op (does not crash).
78 func (h heap) fix(i int) {
79         // If the initial position is a leaf (i.e., index is greater
80         // than the last node's parent index), we only need to move it
81         // up, not down.
82         uponly := i > (len(h)-2)/2
83         // Move the new entry toward the root until reaching a
84         // position where the parent already has higher priority.
85         for i > 0 {
86                 parent := (i - 1) / 2
87                 if h.Less(i, parent) {
88                         h.Swap(i, parent)
89                         i = parent
90                 } else {
91                         break
92                 }
93         }
94         // Move i away from the root until reaching a position where
95         // both children already have lower priority.
96         for !uponly {
97                 child := i*2 + 1
98                 if child+1 < len(h) && h.Less(child+1, child) {
99                         // Right child has higher priority than left
100                         // child. Choose right child.
101                         child = child + 1
102                 }
103                 if child < len(h) && h.Less(child, i) {
104                         // Chosen child has higher priority than i.
105                         // Swap and continue down.
106                         h.Swap(i, child)
107                         i = child
108                 } else {
109                         break
110                 }
111         }
112 }
113
114 func (h *heap) add(ent *qent) {
115         ent.heappos = len(*h)
116         *h = append(*h, ent)
117         h.fix(ent.heappos)
118 }
119
120 func (h *heap) removeMax() *qent {
121         ent := (*h)[0]
122         if len(*h) == 1 {
123                 *h = (*h)[:0]
124         } else {
125                 h.Swap(0, len(*h)-1)
126                 *h = (*h)[:len(*h)-1]
127                 h.fix(0)
128         }
129         ent.heappos = -1
130         return ent
131 }
132
133 func (h *heap) remove(i int) {
134         // Move the last leaf into i's place, then move it to a
135         // correct position.
136         h.Swap(i, len(*h)-1)
137         *h = (*h)[:len(*h)-1]
138         if i < len(*h) {
139                 h.fix(i)
140         }
141 }
142
143 func (rl *RequestLimiter) setup() {
144         if rl.Registry != nil {
145                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
146                         prometheus.GaugeOpts{
147                                 Namespace: "arvados",
148                                 Name:      "concurrent_requests",
149                                 Help:      "Number of requests in progress",
150                         },
151                         func() float64 {
152                                 rl.mtx.Lock()
153                                 defer rl.mtx.Unlock()
154                                 return float64(rl.handling)
155                         },
156                 ))
157                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
158                         prometheus.GaugeOpts{
159                                 Namespace: "arvados",
160                                 Name:      "max_concurrent_requests",
161                                 Help:      "Maximum number of concurrent requests",
162                         },
163                         func() float64 { return float64(rl.MaxConcurrent) },
164                 ))
165                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
166                         prometheus.GaugeOpts{
167                                 Namespace: "arvados",
168                                 Name:      "queued_requests",
169                                 Help:      "Number of requests in queue",
170                         },
171                         func() float64 {
172                                 rl.mtx.Lock()
173                                 defer rl.mtx.Unlock()
174                                 return float64(len(rl.queue))
175                         },
176                 ))
177                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
178                         prometheus.GaugeOpts{
179                                 Namespace: "arvados",
180                                 Name:      "max_queued_requests",
181                                 Help:      "Maximum number of queued requests",
182                         },
183                         func() float64 { return float64(rl.MaxQueue) },
184                 ))
185         }
186 }
187
188 // caller must have lock
189 func (rl *RequestLimiter) runqueue() {
190         // Handle entries from the queue as capacity permits
191         for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
192                 rl.handling++
193                 ent := rl.queue.removeMax()
194                 ent.heappos = -1
195                 ent.ready <- true
196         }
197 }
198
199 // If the queue is too full, fail and remove the lowest-priority
200 // entry. Caller must have lock. Queue must not be empty.
201 func (rl *RequestLimiter) trimqueue() {
202         if len(rl.queue) <= rl.MaxQueue {
203                 return
204         }
205         min := 0
206         for i := range rl.queue {
207                 if i == 0 || rl.queue.Less(min, i) {
208                         min = i
209                 }
210         }
211         rl.queue[min].heappos = -1
212         rl.queue[min].ready <- false
213         rl.queue.remove(min)
214 }
215
216 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
217         rl.mtx.Lock()
218         defer rl.mtx.Unlock()
219         qtime := time.Now()
220         var priority int64
221         if rl.Priority != nil {
222                 priority = rl.Priority(req, qtime)
223         }
224         ent := &qent{
225                 queued:   qtime,
226                 priority: priority,
227                 ready:    make(chan bool, 1),
228                 heappos:  -1,
229         }
230         if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
231                 // fast path, skip the queue
232                 rl.handling++
233                 ent.ready <- true
234                 return ent
235         }
236         if priority == math.MinInt64 {
237                 // Priority func is telling us to return 503
238                 // immediately instead of queueing, regardless of
239                 // queue size, if we can't handle the request
240                 // immediately.
241                 ent.ready <- false
242                 return ent
243         }
244         rl.queue.add(ent)
245         rl.trimqueue()
246         return ent
247 }
248
249 func (rl *RequestLimiter) remove(ent *qent) {
250         rl.mtx.Lock()
251         defer rl.mtx.Unlock()
252         if ent.heappos >= 0 {
253                 rl.queue.remove(ent.heappos)
254                 ent.heappos = -1
255                 ent.ready <- false
256         }
257 }
258
259 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
260         rl.setupOnce.Do(rl.setup)
261         ent := rl.enqueue(req)
262         SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
263         var ok bool
264         select {
265         case <-req.Context().Done():
266                 rl.remove(ent)
267                 // we still need to wait for ent.ready, because
268                 // sometimes runqueue() will have already decided to
269                 // send true before our rl.remove() call, and in that
270                 // case we'll need to decrement rl.handling below.
271                 ok = <-ent.ready
272         case ok = <-ent.ready:
273         }
274         if !ok {
275                 resp.WriteHeader(http.StatusServiceUnavailable)
276                 return
277         }
278         defer func() {
279                 rl.mtx.Lock()
280                 defer rl.mtx.Unlock()
281                 rl.handling--
282                 // unblock the next waiting request
283                 rl.runqueue()
284         }()
285         rl.Handler.ServeHTTP(resp, req)
286 }