1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/sirupsen/logrus"
18 const MinPriority = math.MinInt64
20 // Prometheus typically polls every 10 seconds, but it doesn't cost us
21 // much to also accommodate higher frequency collection by updating
22 // internal stats more frequently. (This limits time resolution only
23 // for the metrics that aren't generated on the fly.)
24 const metricsUpdateInterval = time.Second
26 // RequestLimiter wraps http.Handler, limiting the number of
27 // concurrent requests being handled by the wrapped Handler. Requests
28 // that arrive when the handler is already at the specified
29 // concurrency limit are queued and handled in the order indicated by
30 // the Priority function.
32 // Caller must not modify any RequestLimiter fields after calling its
34 type RequestLimiter struct {
37 // Queue determines which queue a request is assigned to.
38 Queue func(req *http.Request) *RequestQueue
40 // Priority determines queue ordering. Requests with higher
41 // priority are handled first. Requests with equal priority
42 // are handled FIFO. If Priority is nil, all requests are
44 Priority func(req *http.Request, queued time.Time) int64
46 // "concurrent_requests", "max_concurrent_requests",
47 // "queued_requests", and "max_queued_requests" metrics are
48 // registered with Registry, if it is not nil.
49 Registry *prometheus.Registry
52 mQueueDelay *prometheus.SummaryVec
53 mQueueTimeout *prometheus.SummaryVec
54 mQueueUsage *prometheus.GaugeVec
56 rqs map[*RequestQueue]bool // all RequestQueues in use
59 type RequestQueue struct {
60 // Label for metrics. No two queues should have the same label.
63 // Maximum number of requests being handled at once. Beyond
64 // this limit, requests will be queued.
67 // Maximum number of requests in the queue. Beyond this limit,
68 // the lowest priority requests will return 503.
71 // Return 503 for any request for which Priority() returns
72 // MinPriority if it spends longer than this in the queue
73 // before starting processing.
74 MaxQueueTimeForMinPriority time.Duration
85 ready chan bool // true = handle now; false = return 503 now
90 func (h queue) Swap(i, j int) {
91 h[i], h[j] = h[j], h[i]
92 h[i].heappos, h[j].heappos = i, j
95 func (h queue) Less(i, j int) bool {
96 pi, pj := h[i].priority, h[j].priority
97 return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
100 func (h queue) Len() int {
104 func (h *queue) Push(x interface{}) {
111 func (h *queue) Pop() interface{} {
120 func (h *queue) add(ent *qent) {
121 ent.heappos = h.Len()
125 func (h *queue) removeMax() *qent {
126 return heap.Pop(h).(*qent)
129 func (h *queue) remove(i int) {
133 func (rl *RequestLimiter) setup() {
134 if rl.Registry != nil {
135 mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
136 Namespace: "arvados",
137 Name: "concurrent_requests",
138 Help: "Number of requests in progress",
139 }, []string{"queue"})
140 rl.Registry.MustRegister(mCurrentReqs)
141 mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
142 Namespace: "arvados",
143 Name: "max_concurrent_requests",
144 Help: "Maximum number of concurrent requests",
145 }, []string{"queue"})
146 rl.Registry.MustRegister(mMaxReqs)
147 mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{
148 Namespace: "arvados",
149 Name: "max_queued_requests",
150 Help: "Maximum number of queued requests",
151 }, []string{"queue"})
152 rl.Registry.MustRegister(mMaxQueue)
153 rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
154 Namespace: "arvados",
155 Name: "queued_requests",
156 Help: "Number of requests in queue",
157 }, []string{"queue", "priority"})
158 rl.Registry.MustRegister(rl.mQueueUsage)
159 rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
160 Namespace: "arvados",
161 Name: "queue_delay_seconds",
162 Help: "Time spent in the incoming request queue before start of processing",
163 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
164 }, []string{"queue", "priority"})
165 rl.Registry.MustRegister(rl.mQueueDelay)
166 rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
167 Namespace: "arvados",
168 Name: "queue_timeout_seconds",
169 Help: "Time spent in the incoming request queue before client timed out or disconnected",
170 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
171 }, []string{"queue", "priority"})
172 rl.Registry.MustRegister(rl.mQueueTimeout)
174 for range time.NewTicker(metricsUpdateInterval).C {
176 for rq := range rl.rqs {
177 var low, normal, high int
178 for _, ent := range rq.queue {
180 case ent.priority < 0:
182 case ent.priority > 0:
188 mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling))
189 mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent))
190 mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue))
191 rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low))
192 rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal))
193 rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high))
201 // caller must have lock
202 func (rq *RequestQueue) runqueue() {
203 // Handle entries from the queue as capacity permits
204 for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) {
206 ent := rq.queue.removeMax()
211 // If the queue is too full, fail and remove the lowest-priority
212 // entry. Caller must have lock. Queue must not be empty.
213 func (rq *RequestQueue) trimqueue() {
214 if len(rq.queue) <= rq.MaxQueue {
218 for i := range rq.queue {
219 if i == 0 || rq.queue.Less(min, i) {
223 rq.queue[min].ready <- false
227 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
229 defer rl.mtx.Unlock()
232 if rl.Priority != nil {
233 priority = rl.Priority(req, qtime)
239 ready: make(chan bool, 1),
243 rl.rqs = map[*RequestQueue]bool{}
245 rl.rqs[ent.rq] = true
246 if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling {
247 // fast path, skip the queue
252 ent.rq.queue.add(ent)
257 func (rl *RequestLimiter) remove(ent *qent) {
259 defer rl.mtx.Unlock()
260 if ent.heappos >= 0 {
261 ent.rq.queue.remove(ent.heappos)
266 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
267 rl.setupOnce.Do(rl.setup)
268 ent := rl.enqueue(req)
269 SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label})
270 if ent.priority == MinPriority {
271 // Note that MaxQueueTime==0 does not cancel a req
272 // that skips the queue, because in that case
273 // rl.enqueue() has already fired ready<-true and
274 // rl.remove() is a no-op.
276 time.Sleep(ent.rq.MaxQueueTimeForMinPriority)
282 case <-req.Context().Done():
284 // we still need to wait for ent.ready, because
285 // sometimes runqueue() will have already decided to
286 // send true before our rl.remove() call, and in that
287 // case we'll need to decrement ent.rq.handling below.
289 case ok = <-ent.ready:
292 // Report time spent in queue in the appropriate bucket:
293 // mQueueDelay if the request actually got processed,
294 // mQueueTimeout if it was abandoned or cancelled before
295 // getting a processing slot.
296 var series *prometheus.SummaryVec
298 series = rl.mQueueDelay
300 series = rl.mQueueTimeout
305 case ent.priority < 0:
307 case ent.priority > 0:
312 series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
316 resp.WriteHeader(http.StatusServiceUnavailable)
321 defer rl.mtx.Unlock()
323 // unblock the next waiting request
326 rl.Handler.ServeHTTP(resp, req)