20602: Report queue time for pos/neg/zero-priority reqs.
[arvados.git] / sdk / go / httpserver / request_limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "container/heap"
9         "math"
10         "net/http"
11         "sync"
12         "time"
13
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 const IneligibleForQueuePriority = math.MinInt64
19
20 // RequestLimiter wraps http.Handler, limiting the number of
21 // concurrent requests being handled by the wrapped Handler. Requests
22 // that arrive when the handler is already at the specified
23 // concurrency limit are queued and handled in the order indicated by
24 // the Priority function.
25 //
26 // Caller must not modify any RequestLimiter fields after calling its
27 // methods.
28 type RequestLimiter struct {
29         Handler http.Handler
30
31         // Maximum number of requests being handled at once. Beyond
32         // this limit, requests will be queued.
33         MaxConcurrent int
34
35         // Maximum number of requests in the queue. Beyond this limit,
36         // the lowest priority requests will return 503.
37         MaxQueue int
38
39         // Priority determines queue ordering. Requests with higher
40         // priority are handled first. Requests with equal priority
41         // are handled FIFO. If Priority is nil, all requests are
42         // handled FIFO.
43         Priority func(req *http.Request, queued time.Time) int64
44
45         // "concurrent_requests", "max_concurrent_requests",
46         // "queued_requests", and "max_queued_requests" metrics are
47         // registered with Registry, if it is not nil.
48         Registry *prometheus.Registry
49
50         setupOnce   sync.Once
51         mQueueDelay *prometheus.SummaryVec
52         mtx         sync.Mutex
53         handling    int
54         queue       queue
55 }
56
57 type qent struct {
58         queued   time.Time
59         priority int64
60         heappos  int
61         ready    chan bool // true = handle now; false = return 503 now
62 }
63
64 type queue []*qent
65
66 func (h queue) Swap(i, j int) {
67         h[i], h[j] = h[j], h[i]
68         h[i].heappos, h[j].heappos = i, j
69 }
70
71 func (h queue) Less(i, j int) bool {
72         pi, pj := h[i].priority, h[j].priority
73         return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
74 }
75
76 func (h queue) Len() int {
77         return len(h)
78 }
79
80 func (h *queue) Push(x interface{}) {
81         n := len(*h)
82         ent := x.(*qent)
83         ent.heappos = n
84         *h = append(*h, ent)
85 }
86
87 func (h *queue) Pop() interface{} {
88         n := len(*h)
89         ent := (*h)[n-1]
90         ent.heappos = -1
91         (*h)[n-1] = nil
92         *h = (*h)[0 : n-1]
93         return ent
94 }
95
96 func (h *queue) add(ent *qent) {
97         ent.heappos = h.Len()
98         h.Push(ent)
99 }
100
101 func (h *queue) removeMax() *qent {
102         return heap.Pop(h).(*qent)
103 }
104
105 func (h *queue) remove(i int) {
106         heap.Remove(h, i)
107 }
108
109 func (rl *RequestLimiter) setup() {
110         if rl.Registry != nil {
111                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
112                         prometheus.GaugeOpts{
113                                 Namespace: "arvados",
114                                 Name:      "concurrent_requests",
115                                 Help:      "Number of requests in progress",
116                         },
117                         func() float64 {
118                                 rl.mtx.Lock()
119                                 defer rl.mtx.Unlock()
120                                 return float64(rl.handling)
121                         },
122                 ))
123                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
124                         prometheus.GaugeOpts{
125                                 Namespace: "arvados",
126                                 Name:      "max_concurrent_requests",
127                                 Help:      "Maximum number of concurrent requests",
128                         },
129                         func() float64 { return float64(rl.MaxConcurrent) },
130                 ))
131                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
132                         prometheus.GaugeOpts{
133                                 Namespace: "arvados",
134                                 Name:      "queued_requests",
135                                 Help:      "Number of requests in queue",
136                         },
137                         func() float64 {
138                                 rl.mtx.Lock()
139                                 defer rl.mtx.Unlock()
140                                 return float64(len(rl.queue))
141                         },
142                 ))
143                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
144                         prometheus.GaugeOpts{
145                                 Namespace: "arvados",
146                                 Name:      "max_queued_requests",
147                                 Help:      "Maximum number of queued requests",
148                         },
149                         func() float64 { return float64(rl.MaxQueue) },
150                 ))
151                 rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
152                         Namespace:  "arvados",
153                         Name:       "queue_delay_seconds",
154                         Help:       "Time spent in the incoming request queue",
155                         Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
156                 }, []string{"priority"})
157                 rl.Registry.MustRegister(rl.mQueueDelay)
158         }
159 }
160
161 // caller must have lock
162 func (rl *RequestLimiter) runqueue() {
163         // Handle entries from the queue as capacity permits
164         for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
165                 rl.handling++
166                 ent := rl.queue.removeMax()
167                 ent.ready <- true
168         }
169 }
170
171 // If the queue is too full, fail and remove the lowest-priority
172 // entry. Caller must have lock. Queue must not be empty.
173 func (rl *RequestLimiter) trimqueue() {
174         if len(rl.queue) <= rl.MaxQueue {
175                 return
176         }
177         min := 0
178         for i := range rl.queue {
179                 if i == 0 || rl.queue.Less(min, i) {
180                         min = i
181                 }
182         }
183         rl.queue[min].ready <- false
184         rl.queue.remove(min)
185 }
186
187 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
188         rl.mtx.Lock()
189         defer rl.mtx.Unlock()
190         qtime := time.Now()
191         var priority int64
192         if rl.Priority != nil {
193                 priority = rl.Priority(req, qtime)
194         }
195         ent := &qent{
196                 queued:   qtime,
197                 priority: priority,
198                 ready:    make(chan bool, 1),
199                 heappos:  -1,
200         }
201         if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
202                 // fast path, skip the queue
203                 rl.handling++
204                 ent.ready <- true
205                 return ent
206         }
207         if priority == IneligibleForQueuePriority {
208                 // Priority func is telling us to return 503
209                 // immediately instead of queueing, regardless of
210                 // queue size, if we can't handle the request
211                 // immediately.
212                 ent.ready <- false
213                 return ent
214         }
215         rl.queue.add(ent)
216         rl.trimqueue()
217         return ent
218 }
219
220 func (rl *RequestLimiter) remove(ent *qent) {
221         rl.mtx.Lock()
222         defer rl.mtx.Unlock()
223         if ent.heappos >= 0 {
224                 rl.queue.remove(ent.heappos)
225                 ent.ready <- false
226         }
227 }
228
229 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
230         rl.setupOnce.Do(rl.setup)
231         ent := rl.enqueue(req)
232         SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
233         var ok bool
234         var abandoned bool
235         select {
236         case <-req.Context().Done():
237                 abandoned = true
238                 rl.remove(ent)
239                 // we still need to wait for ent.ready, because
240                 // sometimes runqueue() will have already decided to
241                 // send true before our rl.remove() call, and in that
242                 // case we'll need to decrement rl.handling below.
243                 ok = <-ent.ready
244         case ok = <-ent.ready:
245         }
246
247         // report time spent in queue
248         var qlabel string
249         switch {
250         case abandoned:
251         case !ok && ent.priority == IneligibleForQueuePriority:
252                 // don't pollute stats
253         case ent.priority < 0:
254                 qlabel = "low"
255         case ent.priority > 0:
256                 qlabel = "high"
257         default:
258                 qlabel = "normal"
259         }
260         if qlabel != "" && rl.mQueueDelay != nil {
261                 rl.mQueueDelay.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
262         }
263
264         if !ok {
265                 resp.WriteHeader(http.StatusServiceUnavailable)
266                 return
267         }
268         defer func() {
269                 rl.mtx.Lock()
270                 defer rl.mtx.Unlock()
271                 rl.handling--
272                 // unblock the next waiting request
273                 rl.runqueue()
274         }()
275         rl.Handler.ServeHTTP(resp, req)
276 }