1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/sirupsen/logrus"
18 const IneligibleForQueuePriority = math.MinInt64
20 // RequestLimiter wraps http.Handler, limiting the number of
21 // concurrent requests being handled by the wrapped Handler. Requests
22 // that arrive when the handler is already at the specified
23 // concurrency limit are queued and handled in the order indicated by
24 // the Priority function.
26 // Caller must not modify any RequestLimiter fields after calling its
28 type RequestLimiter struct {
31 // Maximum number of requests being handled at once. Beyond
32 // this limit, requests will be queued.
35 // Maximum number of requests in the queue. Beyond this limit,
36 // the lowest priority requests will return 503.
39 // Priority determines queue ordering. Requests with higher
40 // priority are handled first. Requests with equal priority
41 // are handled FIFO. If Priority is nil, all requests are
43 Priority func(req *http.Request, queued time.Time) int64
45 // "concurrent_requests", "max_concurrent_requests",
46 // "queued_requests", and "max_queued_requests" metrics are
47 // registered with Registry, if it is not nil.
48 Registry *prometheus.Registry
51 mQueueDelay *prometheus.SummaryVec
61 ready chan bool // true = handle now; false = return 503 now
66 func (h queue) Swap(i, j int) {
67 h[i], h[j] = h[j], h[i]
68 h[i].heappos, h[j].heappos = i, j
71 func (h queue) Less(i, j int) bool {
72 pi, pj := h[i].priority, h[j].priority
73 return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
76 func (h queue) Len() int {
80 func (h *queue) Push(x interface{}) {
87 func (h *queue) Pop() interface{} {
96 func (h *queue) add(ent *qent) {
101 func (h *queue) removeMax() *qent {
102 return heap.Pop(h).(*qent)
105 func (h *queue) remove(i int) {
109 func (rl *RequestLimiter) setup() {
110 if rl.Registry != nil {
111 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
112 prometheus.GaugeOpts{
113 Namespace: "arvados",
114 Name: "concurrent_requests",
115 Help: "Number of requests in progress",
119 defer rl.mtx.Unlock()
120 return float64(rl.handling)
123 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
124 prometheus.GaugeOpts{
125 Namespace: "arvados",
126 Name: "max_concurrent_requests",
127 Help: "Maximum number of concurrent requests",
129 func() float64 { return float64(rl.MaxConcurrent) },
131 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
132 prometheus.GaugeOpts{
133 Namespace: "arvados",
134 Name: "queued_requests",
135 Help: "Number of requests in queue",
139 defer rl.mtx.Unlock()
140 return float64(len(rl.queue))
143 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
144 prometheus.GaugeOpts{
145 Namespace: "arvados",
146 Name: "max_queued_requests",
147 Help: "Maximum number of queued requests",
149 func() float64 { return float64(rl.MaxQueue) },
151 rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
152 Namespace: "arvados",
153 Name: "queue_delay_seconds",
154 Help: "Time spent in the incoming request queue",
155 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
156 }, []string{"priority"})
157 rl.Registry.MustRegister(rl.mQueueDelay)
161 // caller must have lock
162 func (rl *RequestLimiter) runqueue() {
163 // Handle entries from the queue as capacity permits
164 for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
166 ent := rl.queue.removeMax()
171 // If the queue is too full, fail and remove the lowest-priority
172 // entry. Caller must have lock. Queue must not be empty.
173 func (rl *RequestLimiter) trimqueue() {
174 if len(rl.queue) <= rl.MaxQueue {
178 for i := range rl.queue {
179 if i == 0 || rl.queue.Less(min, i) {
183 rl.queue[min].ready <- false
187 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
189 defer rl.mtx.Unlock()
192 if rl.Priority != nil {
193 priority = rl.Priority(req, qtime)
198 ready: make(chan bool, 1),
201 if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
202 // fast path, skip the queue
207 if priority == IneligibleForQueuePriority {
208 // Priority func is telling us to return 503
209 // immediately instead of queueing, regardless of
210 // queue size, if we can't handle the request
220 func (rl *RequestLimiter) remove(ent *qent) {
222 defer rl.mtx.Unlock()
223 if ent.heappos >= 0 {
224 rl.queue.remove(ent.heappos)
229 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
230 rl.setupOnce.Do(rl.setup)
231 ent := rl.enqueue(req)
232 SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
236 case <-req.Context().Done():
239 // we still need to wait for ent.ready, because
240 // sometimes runqueue() will have already decided to
241 // send true before our rl.remove() call, and in that
242 // case we'll need to decrement rl.handling below.
244 case ok = <-ent.ready:
247 // report time spent in queue
251 case !ok && ent.priority == IneligibleForQueuePriority:
252 // don't pollute stats
253 case ent.priority < 0:
255 case ent.priority > 0:
260 if qlabel != "" && rl.mQueueDelay != nil {
261 rl.mQueueDelay.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
265 resp.WriteHeader(http.StatusServiceUnavailable)
270 defer rl.mtx.Unlock()
272 // unblock the next waiting request
275 rl.Handler.ServeHTTP(resp, req)