1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/sirupsen/logrus"
18 // RequestLimiter wraps http.Handler, limiting the number of
19 // concurrent requests being handled by the wrapped Handler. Requests
20 // that arrive when the handler is already at the specified
21 // concurrency limit are queued and handled in the order indicated by
22 // the Priority function.
24 // Caller must not modify any RequestLimiter fields after calling its
26 type RequestLimiter struct {
29 // Maximum number of requests being handled at once. Beyond
30 // this limit, requests will be queued.
33 // Maximum number of requests in the queue. Beyond this limit,
34 // the lowest priority requests will return 503.
37 // Priority determines queue ordering. Requests with higher
38 // priority are handled first. Requests with equal priority
39 // are handled FIFO. If Priority is nil, all requests are
41 Priority func(req *http.Request, queued time.Time) int64
43 // "concurrent_requests", "max_concurrent_requests",
44 // "queued_requests", and "max_queued_requests" metrics are
45 // registered with Registry, if it is not nil.
46 Registry *prometheus.Registry
58 ready chan bool // true = handle now; false = return 503 now
63 func (h queue) Swap(i, j int) {
64 h[i], h[j] = h[j], h[i]
65 h[i].heappos, h[j].heappos = i, j
68 func (h queue) Less(i, j int) bool {
69 pi, pj := h[i].priority, h[j].priority
70 return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
73 func (h queue) Len() int {
77 func (h *queue) Push(x interface{}) {
84 func (h *queue) Pop() interface{} {
93 func (h *queue) add(ent *qent) {
98 func (h *queue) removeMax() *qent {
99 return heap.Pop(h).(*qent)
102 func (h *queue) remove(i int) {
106 func (rl *RequestLimiter) setup() {
107 if rl.Registry != nil {
108 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
109 prometheus.GaugeOpts{
110 Namespace: "arvados",
111 Name: "concurrent_requests",
112 Help: "Number of requests in progress",
116 defer rl.mtx.Unlock()
117 return float64(rl.handling)
120 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
121 prometheus.GaugeOpts{
122 Namespace: "arvados",
123 Name: "max_concurrent_requests",
124 Help: "Maximum number of concurrent requests",
126 func() float64 { return float64(rl.MaxConcurrent) },
128 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
129 prometheus.GaugeOpts{
130 Namespace: "arvados",
131 Name: "queued_requests",
132 Help: "Number of requests in queue",
136 defer rl.mtx.Unlock()
137 return float64(len(rl.queue))
140 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
141 prometheus.GaugeOpts{
142 Namespace: "arvados",
143 Name: "max_queued_requests",
144 Help: "Maximum number of queued requests",
146 func() float64 { return float64(rl.MaxQueue) },
151 // caller must have lock
152 func (rl *RequestLimiter) runqueue() {
153 // Handle entries from the queue as capacity permits
154 for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
156 ent := rl.queue.removeMax()
161 // If the queue is too full, fail and remove the lowest-priority
162 // entry. Caller must have lock. Queue must not be empty.
163 func (rl *RequestLimiter) trimqueue() {
164 if len(rl.queue) <= rl.MaxQueue {
168 for i := range rl.queue {
169 if i == 0 || rl.queue.Less(min, i) {
173 rl.queue[min].ready <- false
177 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
179 defer rl.mtx.Unlock()
182 if rl.Priority != nil {
183 priority = rl.Priority(req, qtime)
188 ready: make(chan bool, 1),
191 if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
192 // fast path, skip the queue
197 if priority == math.MinInt64 {
198 // Priority func is telling us to return 503
199 // immediately instead of queueing, regardless of
200 // queue size, if we can't handle the request
210 func (rl *RequestLimiter) remove(ent *qent) {
212 defer rl.mtx.Unlock()
213 if ent.heappos >= 0 {
214 rl.queue.remove(ent.heappos)
219 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
220 rl.setupOnce.Do(rl.setup)
221 ent := rl.enqueue(req)
222 SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
225 case <-req.Context().Done():
227 // we still need to wait for ent.ready, because
228 // sometimes runqueue() will have already decided to
229 // send true before our rl.remove() call, and in that
230 // case we'll need to decrement rl.handling below.
232 case ok = <-ent.ready:
235 resp.WriteHeader(http.StatusServiceUnavailable)
240 defer rl.mtx.Unlock()
242 // unblock the next waiting request
245 rl.Handler.ServeHTTP(resp, req)