20602: Add queue metrics.
[arvados.git] / sdk / go / httpserver / request_limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "container/heap"
9         "math"
10         "net/http"
11         "sync"
12         "time"
13
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 const IneligibleForQueuePriority = math.MinInt64
19
20 // RequestLimiter wraps http.Handler, limiting the number of
21 // concurrent requests being handled by the wrapped Handler. Requests
22 // that arrive when the handler is already at the specified
23 // concurrency limit are queued and handled in the order indicated by
24 // the Priority function.
25 //
26 // Caller must not modify any RequestLimiter fields after calling its
27 // methods.
28 type RequestLimiter struct {
29         Handler http.Handler
30
31         // Maximum number of requests being handled at once. Beyond
32         // this limit, requests will be queued.
33         MaxConcurrent int
34
35         // Maximum number of requests in the queue. Beyond this limit,
36         // the lowest priority requests will return 503.
37         MaxQueue int
38
39         // Priority determines queue ordering. Requests with higher
40         // priority are handled first. Requests with equal priority
41         // are handled FIFO. If Priority is nil, all requests are
42         // handled FIFO.
43         Priority func(req *http.Request, queued time.Time) int64
44
45         // "concurrent_requests", "max_concurrent_requests",
46         // "queued_requests", and "max_queued_requests" metrics are
47         // registered with Registry, if it is not nil.
48         Registry *prometheus.Registry
49
50         setupOnce sync.Once
51         mtx       sync.Mutex
52         handling  int
53         queue     queue
54 }
55
56 type qent struct {
57         queued   time.Time
58         priority int64
59         heappos  int
60         ready    chan bool // true = handle now; false = return 503 now
61 }
62
63 type queue []*qent
64
65 func (h queue) Swap(i, j int) {
66         h[i], h[j] = h[j], h[i]
67         h[i].heappos, h[j].heappos = i, j
68 }
69
70 func (h queue) Less(i, j int) bool {
71         pi, pj := h[i].priority, h[j].priority
72         return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
73 }
74
75 func (h queue) Len() int {
76         return len(h)
77 }
78
79 func (h *queue) Push(x interface{}) {
80         n := len(*h)
81         ent := x.(*qent)
82         ent.heappos = n
83         *h = append(*h, ent)
84 }
85
86 func (h *queue) Pop() interface{} {
87         n := len(*h)
88         ent := (*h)[n-1]
89         ent.heappos = -1
90         (*h)[n-1] = nil
91         *h = (*h)[0 : n-1]
92         return ent
93 }
94
95 func (h *queue) add(ent *qent) {
96         ent.heappos = h.Len()
97         h.Push(ent)
98 }
99
100 func (h *queue) removeMax() *qent {
101         return heap.Pop(h).(*qent)
102 }
103
104 func (h *queue) remove(i int) {
105         heap.Remove(h, i)
106 }
107
108 func (rl *RequestLimiter) setup() {
109         if rl.Registry != nil {
110                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
111                         prometheus.GaugeOpts{
112                                 Namespace: "arvados",
113                                 Name:      "concurrent_requests",
114                                 Help:      "Number of requests in progress",
115                         },
116                         func() float64 {
117                                 rl.mtx.Lock()
118                                 defer rl.mtx.Unlock()
119                                 return float64(rl.handling)
120                         },
121                 ))
122                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
123                         prometheus.GaugeOpts{
124                                 Namespace: "arvados",
125                                 Name:      "max_concurrent_requests",
126                                 Help:      "Maximum number of concurrent requests",
127                         },
128                         func() float64 { return float64(rl.MaxConcurrent) },
129                 ))
130                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
131                         prometheus.GaugeOpts{
132                                 Namespace: "arvados",
133                                 Name:      "queued_requests",
134                                 Help:      "Number of requests in queue",
135                         },
136                         func() float64 {
137                                 rl.mtx.Lock()
138                                 defer rl.mtx.Unlock()
139                                 return float64(len(rl.queue))
140                         },
141                 ))
142                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
143                         prometheus.GaugeOpts{
144                                 Namespace: "arvados",
145                                 Name:      "max_queued_requests",
146                                 Help:      "Maximum number of queued requests",
147                         },
148                         func() float64 { return float64(rl.MaxQueue) },
149                 ))
150                 rl.mQueueDelay = prometheus.NewSummary(prometheus.SummaryOpts{
151                         Namespace:  "arvados",
152                         Name:       "queue_delay_seconds",
153                         Help:       "Number of seconds spent in the incoming request queue",
154                         Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
155                 })
156                 reg.MustRegister(rl.mQueueDelay)
157         }
158 }
159
160 // caller must have lock
161 func (rl *RequestLimiter) runqueue() {
162         // Handle entries from the queue as capacity permits
163         for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
164                 rl.handling++
165                 ent := rl.queue.removeMax()
166                 ent.ready <- true
167         }
168 }
169
170 // If the queue is too full, fail and remove the lowest-priority
171 // entry. Caller must have lock. Queue must not be empty.
172 func (rl *RequestLimiter) trimqueue() {
173         if len(rl.queue) <= rl.MaxQueue {
174                 return
175         }
176         min := 0
177         for i := range rl.queue {
178                 if i == 0 || rl.queue.Less(min, i) {
179                         min = i
180                 }
181         }
182         rl.queue[min].ready <- false
183         rl.queue.remove(min)
184 }
185
186 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
187         rl.mtx.Lock()
188         defer rl.mtx.Unlock()
189         qtime := time.Now()
190         var priority int64
191         if rl.Priority != nil {
192                 priority = rl.Priority(req, qtime)
193         }
194         ent := &qent{
195                 queued:   qtime,
196                 priority: priority,
197                 ready:    make(chan bool, 1),
198                 heappos:  -1,
199         }
200         if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
201                 // fast path, skip the queue
202                 rl.handling++
203                 ent.ready <- true
204                 return ent
205         }
206         if priority == IneligibleForQueuePriority {
207                 // Priority func is telling us to return 503
208                 // immediately instead of queueing, regardless of
209                 // queue size, if we can't handle the request
210                 // immediately.
211                 ent.ready <- false
212                 return ent
213         }
214         rl.queue.add(ent)
215         rl.trimqueue()
216         return ent
217 }
218
219 func (rl *RequestLimiter) remove(ent *qent) {
220         rl.mtx.Lock()
221         defer rl.mtx.Unlock()
222         if ent.heappos >= 0 {
223                 rl.queue.remove(ent.heappos)
224                 ent.ready <- false
225         }
226 }
227
228 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
229         rl.setupOnce.Do(rl.setup)
230         ent := rl.enqueue(req)
231         SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
232         var ok bool
233         select {
234         case <-req.Context().Done():
235                 rl.remove(ent)
236                 // we still need to wait for ent.ready, because
237                 // sometimes runqueue() will have already decided to
238                 // send true before our rl.remove() call, and in that
239                 // case we'll need to decrement rl.handling below.
240                 ok = <-ent.ready
241         case ok = <-ent.ready:
242         }
243         if !ok {
244                 resp.WriteHeader(http.StatusServiceUnavailable)
245                 return
246         }
247         defer func() {
248                 rl.mtx.Lock()
249                 defer rl.mtx.Unlock()
250                 rl.handling--
251                 // unblock the next waiting request
252                 rl.runqueue()
253         }()
254         rl.Handler.ServeHTTP(resp, req)
255 }