1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/sirupsen/logrus"
18 const MinPriority = math.MinInt64
20 // Prometheus typically polls every 10 seconds, but it doesn't cost us
21 // much to also accommodate higher frequency collection by updating
22 // internal stats more frequently. (This limits time resolution only
23 // for the metrics that aren't generated on the fly.)
24 const metricsUpdateInterval = time.Second
26 // RequestLimiter wraps http.Handler, limiting the number of
27 // concurrent requests being handled by the wrapped Handler. Requests
28 // that arrive when the handler is already at the specified
29 // concurrency limit are queued and handled in the order indicated by
30 // the Priority function.
32 // Caller must not modify any RequestLimiter fields after calling its
34 type RequestLimiter struct {
37 // Maximum number of requests being handled at once. Beyond
38 // this limit, requests will be queued.
41 // Maximum number of requests in the queue. Beyond this limit,
42 // the lowest priority requests will return 503.
45 // Priority determines queue ordering. Requests with higher
46 // priority are handled first. Requests with equal priority
47 // are handled FIFO. If Priority is nil, all requests are
49 Priority func(req *http.Request, queued time.Time) int64
51 // Return 503 for any request for which Priority() returns
52 // MinPriority if it spends longer than this in the queue
53 // before starting processing.
54 MaxQueueTimeForMinPriority time.Duration
56 // "concurrent_requests", "max_concurrent_requests",
57 // "queued_requests", and "max_queued_requests" metrics are
58 // registered with Registry, if it is not nil.
59 Registry *prometheus.Registry
62 mQueueDelay *prometheus.SummaryVec
63 mQueueTimeout *prometheus.SummaryVec
64 mQueueUsage *prometheus.GaugeVec
74 ready chan bool // true = handle now; false = return 503 now
79 func (h queue) Swap(i, j int) {
80 h[i], h[j] = h[j], h[i]
81 h[i].heappos, h[j].heappos = i, j
84 func (h queue) Less(i, j int) bool {
85 pi, pj := h[i].priority, h[j].priority
86 return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
89 func (h queue) Len() int {
93 func (h *queue) Push(x interface{}) {
100 func (h *queue) Pop() interface{} {
109 func (h *queue) add(ent *qent) {
110 ent.heappos = h.Len()
114 func (h *queue) removeMax() *qent {
115 return heap.Pop(h).(*qent)
118 func (h *queue) remove(i int) {
122 func (rl *RequestLimiter) setup() {
123 if rl.Registry != nil {
124 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
125 prometheus.GaugeOpts{
126 Namespace: "arvados",
127 Name: "concurrent_requests",
128 Help: "Number of requests in progress",
132 defer rl.mtx.Unlock()
133 return float64(rl.handling)
136 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
137 prometheus.GaugeOpts{
138 Namespace: "arvados",
139 Name: "max_concurrent_requests",
140 Help: "Maximum number of concurrent requests",
142 func() float64 { return float64(rl.MaxConcurrent) },
144 rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
145 Namespace: "arvados",
146 Name: "queued_requests",
147 Help: "Number of requests in queue",
148 }, []string{"priority"})
149 rl.Registry.MustRegister(rl.mQueueUsage)
150 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
151 prometheus.GaugeOpts{
152 Namespace: "arvados",
153 Name: "max_queued_requests",
154 Help: "Maximum number of queued requests",
156 func() float64 { return float64(rl.MaxQueue) },
158 rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
159 Namespace: "arvados",
160 Name: "queue_delay_seconds",
161 Help: "Time spent in the incoming request queue before start of processing",
162 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
163 }, []string{"priority"})
164 rl.Registry.MustRegister(rl.mQueueDelay)
165 rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
166 Namespace: "arvados",
167 Name: "queue_timeout_seconds",
168 Help: "Time spent in the incoming request queue before client timed out or disconnected",
169 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
170 }, []string{"priority"})
171 rl.Registry.MustRegister(rl.mQueueTimeout)
173 for range time.NewTicker(metricsUpdateInterval).C {
174 var low, normal, high int
176 for _, ent := range rl.queue {
178 case ent.priority < 0:
180 case ent.priority > 0:
187 rl.mQueueUsage.WithLabelValues("low").Set(float64(low))
188 rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal))
189 rl.mQueueUsage.WithLabelValues("high").Set(float64(high))
195 // caller must have lock
196 func (rl *RequestLimiter) runqueue() {
197 // Handle entries from the queue as capacity permits
198 for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
200 ent := rl.queue.removeMax()
205 // If the queue is too full, fail and remove the lowest-priority
206 // entry. Caller must have lock. Queue must not be empty.
207 func (rl *RequestLimiter) trimqueue() {
208 if len(rl.queue) <= rl.MaxQueue {
212 for i := range rl.queue {
213 if i == 0 || rl.queue.Less(min, i) {
217 rl.queue[min].ready <- false
221 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
223 defer rl.mtx.Unlock()
226 if rl.Priority != nil {
227 priority = rl.Priority(req, qtime)
232 ready: make(chan bool, 1),
235 if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
236 // fast path, skip the queue
246 func (rl *RequestLimiter) remove(ent *qent) {
248 defer rl.mtx.Unlock()
249 if ent.heappos >= 0 {
250 rl.queue.remove(ent.heappos)
255 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
256 rl.setupOnce.Do(rl.setup)
257 ent := rl.enqueue(req)
258 SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
259 if ent.priority == MinPriority {
260 // Note that MaxQueueTime==0 does not cancel a req
261 // that skips the queue, because in that case
262 // rl.enqueue() has already fired ready<-true and
263 // rl.remove() is a no-op.
265 time.Sleep(rl.MaxQueueTimeForMinPriority)
271 case <-req.Context().Done():
273 // we still need to wait for ent.ready, because
274 // sometimes runqueue() will have already decided to
275 // send true before our rl.remove() call, and in that
276 // case we'll need to decrement rl.handling below.
278 case ok = <-ent.ready:
281 // Report time spent in queue in the appropriate bucket:
282 // mQueueDelay if the request actually got processed,
283 // mQueueTimeout if it was abandoned or cancelled before
284 // getting a processing slot.
285 var series *prometheus.SummaryVec
287 series = rl.mQueueDelay
289 series = rl.mQueueTimeout
294 case ent.priority < 0:
296 case ent.priority > 0:
301 series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
305 resp.WriteHeader(http.StatusServiceUnavailable)
310 defer rl.mtx.Unlock()
312 // unblock the next waiting request
315 rl.Handler.ServeHTTP(resp, req)