1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
12 "github.com/prometheus/client_golang/prometheus"
15 // RequestLimiter wraps http.Handler, limiting the number of
16 // concurrent requests being handled by the wrapped Handler. Requests
17 // that arrive when the handler is already at the specified
18 // concurrency limit are queued and handled in the order indicated by
19 // the Priority function.
21 // Caller must not modify any RequestLimiter fields after calling its
23 type RequestLimiter struct {
26 // Maximum number of requests being handled at once. Beyond
27 // this limit, requests will be queued.
30 // Maximum number of requests in the queue. Beyond this limit,
31 // the lowest priority requests will return 503.
34 // Priority determines queue ordering. Requests with higher
35 // priority are handled first. Requests with equal priority
36 // are handled FIFO. If Priority is nil, all requests are
38 Priority func(req *http.Request, queued time.Time) int64
40 // "concurrent_requests", "max_concurrent_requests",
41 // "queued_requests", and "max_queued_requests" metrics are
42 // registered with Registry, if it is not nil.
43 Registry *prometheus.Registry
55 ready chan bool // true = handle now; false = return 503 now
60 func (h heap) Swap(i, j int) {
61 h[i], h[j] = h[j], h[i]
62 h[i].heappos, h[j].heappos = i, j
65 func (h heap) Less(i, j int) bool {
66 pi, pj := h[i].priority, h[j].priority
67 return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
70 func (h heap) Len() int {
74 // Move element i to a correct position in the heap. When the heap is
75 // empty, fix(0) is a no-op (does not crash).
76 func (h heap) fix(i int) {
77 // If the initial position is a leaf (i.e., index is greater
78 // than the last node's parent index), we only need to move it
80 uponly := i > (len(h)-2)/2
81 // Move the new entry toward the root until reaching a
82 // position where the parent already has higher priority.
85 if h.Less(i, parent) {
92 // Move i away from the root until reaching a position where
93 // both children already have lower priority.
96 if child+1 < len(h) && h.Less(child+1, child) {
97 // Right child has higher priority than left
98 // child. Choose right child.
101 if child < len(h) && h.Less(child, i) {
102 // Chosen child has higher priority than i.
103 // Swap and continue down.
112 func (h *heap) add(ent *qent) {
113 ent.heappos = len(*h)
118 func (h *heap) removeMax() *qent {
124 *h = (*h)[:len(*h)-1]
131 func (h *heap) remove(i int) {
132 // Move the last leaf into i's place, then move it to a
135 *h = (*h)[:len(*h)-1]
141 func (rl *RequestLimiter) setup() {
142 if rl.Registry != nil {
143 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
144 prometheus.GaugeOpts{
145 Namespace: "arvados",
146 Name: "concurrent_requests",
147 Help: "Number of requests in progress",
151 defer rl.mtx.Unlock()
152 return float64(rl.handling)
155 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
156 prometheus.GaugeOpts{
157 Namespace: "arvados",
158 Name: "max_concurrent_requests",
159 Help: "Maximum number of concurrent requests",
161 func() float64 { return float64(rl.MaxConcurrent) },
163 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
164 prometheus.GaugeOpts{
165 Namespace: "arvados",
166 Name: "queued_requests",
167 Help: "Number of requests in queue",
171 defer rl.mtx.Unlock()
172 return float64(len(rl.queue))
175 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
176 prometheus.GaugeOpts{
177 Namespace: "arvados",
178 Name: "max_queued_requests",
179 Help: "Maximum number of queued requests",
181 func() float64 { return float64(rl.MaxQueue) },
186 // caller must have lock
187 func (rl *RequestLimiter) runqueue() {
188 // Handle entries from the queue as capacity permits
189 for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
191 ent := rl.queue.removeMax()
197 // If the queue is too full, fail and remove the lowest-priority
198 // entry. Caller must have lock. Queue must not be empty.
199 func (rl *RequestLimiter) trimqueue() {
200 if len(rl.queue) <= rl.MaxQueue {
204 for i := range rl.queue {
205 if i == 0 || rl.queue.Less(min, i) {
209 rl.queue[min].heappos = -1
210 rl.queue[min].ready <- false
214 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
216 defer rl.mtx.Unlock()
219 if rl.Priority != nil {
220 priority = rl.Priority(req, qtime)
225 ready: make(chan bool, 1),
228 if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
229 // fast path, skip the queue
239 func (rl *RequestLimiter) remove(ent *qent) {
241 defer rl.mtx.Unlock()
242 if ent.heappos >= 0 {
243 rl.queue.remove(ent.heappos)
249 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
250 rl.setupOnce.Do(rl.setup)
251 ent := rl.enqueue(req)
254 case <-req.Context().Done():
256 // we still need to wait for ent.ready, because
257 // sometimes runqueue() will have already decided to
258 // send true before our rl.remove() call, and in that
259 // case we'll need to decrement rl.handling below.
261 case ok = <-ent.ready:
264 resp.WriteHeader(http.StatusServiceUnavailable)
269 defer rl.mtx.Unlock()
271 // unblock the next waiting request
274 rl.Handler.ServeHTTP(resp, req)