1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/sirupsen/logrus"
17 // RequestLimiter wraps http.Handler, limiting the number of
18 // concurrent requests being handled by the wrapped Handler. Requests
19 // that arrive when the handler is already at the specified
20 // concurrency limit are queued and handled in the order indicated by
21 // the Priority function.
23 // Caller must not modify any RequestLimiter fields after calling its
25 type RequestLimiter struct {
28 // Maximum number of requests being handled at once. Beyond
29 // this limit, requests will be queued.
32 // Maximum number of requests in the queue. Beyond this limit,
33 // the lowest priority requests will return 503.
36 // Priority determines queue ordering. Requests with higher
37 // priority are handled first. Requests with equal priority
38 // are handled FIFO. If Priority is nil, all requests are
40 Priority func(req *http.Request, queued time.Time) int64
42 // "concurrent_requests", "max_concurrent_requests",
43 // "queued_requests", and "max_queued_requests" metrics are
44 // registered with Registry, if it is not nil.
45 Registry *prometheus.Registry
57 ready chan bool // true = handle now; false = return 503 now
62 func (h heap) Swap(i, j int) {
63 h[i], h[j] = h[j], h[i]
64 h[i].heappos, h[j].heappos = i, j
67 func (h heap) Less(i, j int) bool {
68 pi, pj := h[i].priority, h[j].priority
69 return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
72 func (h heap) Len() int {
76 // Move element i to a correct position in the heap. When the heap is
77 // empty, fix(0) is a no-op (does not crash).
78 func (h heap) fix(i int) {
79 // If the initial position is a leaf (i.e., index is greater
80 // than the last node's parent index), we only need to move it
82 uponly := i > (len(h)-2)/2
83 // Move the new entry toward the root until reaching a
84 // position where the parent already has higher priority.
87 if h.Less(i, parent) {
94 // Move i away from the root until reaching a position where
95 // both children already have lower priority.
98 if child+1 < len(h) && h.Less(child+1, child) {
99 // Right child has higher priority than left
100 // child. Choose right child.
103 if child < len(h) && h.Less(child, i) {
104 // Chosen child has higher priority than i.
105 // Swap and continue down.
114 func (h *heap) add(ent *qent) {
115 ent.heappos = len(*h)
120 func (h *heap) removeMax() *qent {
126 *h = (*h)[:len(*h)-1]
133 func (h *heap) remove(i int) {
134 // Move the last leaf into i's place, then move it to a
137 *h = (*h)[:len(*h)-1]
143 func (rl *RequestLimiter) setup() {
144 if rl.Registry != nil {
145 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
146 prometheus.GaugeOpts{
147 Namespace: "arvados",
148 Name: "concurrent_requests",
149 Help: "Number of requests in progress",
153 defer rl.mtx.Unlock()
154 return float64(rl.handling)
157 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
158 prometheus.GaugeOpts{
159 Namespace: "arvados",
160 Name: "max_concurrent_requests",
161 Help: "Maximum number of concurrent requests",
163 func() float64 { return float64(rl.MaxConcurrent) },
165 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
166 prometheus.GaugeOpts{
167 Namespace: "arvados",
168 Name: "queued_requests",
169 Help: "Number of requests in queue",
173 defer rl.mtx.Unlock()
174 return float64(len(rl.queue))
177 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
178 prometheus.GaugeOpts{
179 Namespace: "arvados",
180 Name: "max_queued_requests",
181 Help: "Maximum number of queued requests",
183 func() float64 { return float64(rl.MaxQueue) },
188 // caller must have lock
189 func (rl *RequestLimiter) runqueue() {
190 // Handle entries from the queue as capacity permits
191 for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
193 ent := rl.queue.removeMax()
199 // If the queue is too full, fail and remove the lowest-priority
200 // entry. Caller must have lock. Queue must not be empty.
201 func (rl *RequestLimiter) trimqueue() {
202 if len(rl.queue) <= rl.MaxQueue {
206 for i := range rl.queue {
207 if i == 0 || rl.queue.Less(min, i) {
211 rl.queue[min].heappos = -1
212 rl.queue[min].ready <- false
216 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
218 defer rl.mtx.Unlock()
221 if rl.Priority != nil {
222 priority = rl.Priority(req, qtime)
227 ready: make(chan bool, 1),
230 if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
231 // fast path, skip the queue
236 if priority == math.MinInt64 {
237 // Priority func is telling us to return 503
238 // immediately instead of queueing, regardless of
239 // queue size, if we can't handle the request
249 func (rl *RequestLimiter) remove(ent *qent) {
251 defer rl.mtx.Unlock()
252 if ent.heappos >= 0 {
253 rl.queue.remove(ent.heappos)
259 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
260 rl.setupOnce.Do(rl.setup)
261 ent := rl.enqueue(req)
262 SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
265 case <-req.Context().Done():
267 // we still need to wait for ent.ready, because
268 // sometimes runqueue() will have already decided to
269 // send true before our rl.remove() call, and in that
270 // case we'll need to decrement rl.handling below.
272 case ok = <-ent.ready:
275 resp.WriteHeader(http.StatusServiceUnavailable)
280 defer rl.mtx.Unlock()
282 // unblock the next waiting request
285 rl.Handler.ServeHTTP(resp, req)