402de3e10cac30df68593156882abd10621b467b
[arvados.git] / sdk / go / httpserver / request_limiter.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "container/heap"
9         "math"
10         "net/http"
11         "sync"
12         "time"
13
14         "github.com/prometheus/client_golang/prometheus"
15         "github.com/sirupsen/logrus"
16 )
17
18 // RequestLimiter wraps http.Handler, limiting the number of
19 // concurrent requests being handled by the wrapped Handler. Requests
20 // that arrive when the handler is already at the specified
21 // concurrency limit are queued and handled in the order indicated by
22 // the Priority function.
23 //
24 // Caller must not modify any RequestLimiter fields after calling its
25 // methods.
26 type RequestLimiter struct {
27         Handler http.Handler
28
29         // Maximum number of requests being handled at once. Beyond
30         // this limit, requests will be queued.
31         MaxConcurrent int
32
33         // Maximum number of requests in the queue. Beyond this limit,
34         // the lowest priority requests will return 503.
35         MaxQueue int
36
37         // Priority determines queue ordering. Requests with higher
38         // priority are handled first. Requests with equal priority
39         // are handled FIFO. If Priority is nil, all requests are
40         // handled FIFO.
41         Priority func(req *http.Request, queued time.Time) int64
42
43         // "concurrent_requests", "max_concurrent_requests",
44         // "queued_requests", and "max_queued_requests" metrics are
45         // registered with Registry, if it is not nil.
46         Registry *prometheus.Registry
47
48         setupOnce sync.Once
49         mtx       sync.Mutex
50         handling  int
51         queue     queue
52 }
53
54 type qent struct {
55         queued   time.Time
56         priority int64
57         heappos  int
58         ready    chan bool // true = handle now; false = return 503 now
59 }
60
61 type queue []*qent
62
63 func (h queue) Swap(i, j int) {
64         h[i], h[j] = h[j], h[i]
65         h[i].heappos, h[j].heappos = i, j
66 }
67
68 func (h queue) Less(i, j int) bool {
69         pi, pj := h[i].priority, h[j].priority
70         return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
71 }
72
73 func (h queue) Len() int {
74         return len(h)
75 }
76
77 func (h *queue) Push(x interface{}) {
78         n := len(*h)
79         ent := x.(*qent)
80         ent.heappos = n
81         *h = append(*h, ent)
82 }
83
84 func (h *queue) Pop() interface{} {
85         n := len(*h)
86         ent := (*h)[n-1]
87         ent.heappos = -1
88         (*h)[n-1] = nil
89         *h = (*h)[0 : n-1]
90         return ent
91 }
92
93 func (h *queue) add(ent *qent) {
94         ent.heappos = h.Len()
95         h.Push(ent)
96 }
97
98 func (h *queue) removeMax() *qent {
99         return heap.Pop(h).(*qent)
100 }
101
102 func (h *queue) remove(i int) {
103         heap.Remove(h, i)
104 }
105
106 func (rl *RequestLimiter) setup() {
107         if rl.Registry != nil {
108                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
109                         prometheus.GaugeOpts{
110                                 Namespace: "arvados",
111                                 Name:      "concurrent_requests",
112                                 Help:      "Number of requests in progress",
113                         },
114                         func() float64 {
115                                 rl.mtx.Lock()
116                                 defer rl.mtx.Unlock()
117                                 return float64(rl.handling)
118                         },
119                 ))
120                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
121                         prometheus.GaugeOpts{
122                                 Namespace: "arvados",
123                                 Name:      "max_concurrent_requests",
124                                 Help:      "Maximum number of concurrent requests",
125                         },
126                         func() float64 { return float64(rl.MaxConcurrent) },
127                 ))
128                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
129                         prometheus.GaugeOpts{
130                                 Namespace: "arvados",
131                                 Name:      "queued_requests",
132                                 Help:      "Number of requests in queue",
133                         },
134                         func() float64 {
135                                 rl.mtx.Lock()
136                                 defer rl.mtx.Unlock()
137                                 return float64(len(rl.queue))
138                         },
139                 ))
140                 rl.Registry.MustRegister(prometheus.NewGaugeFunc(
141                         prometheus.GaugeOpts{
142                                 Namespace: "arvados",
143                                 Name:      "max_queued_requests",
144                                 Help:      "Maximum number of queued requests",
145                         },
146                         func() float64 { return float64(rl.MaxQueue) },
147                 ))
148         }
149 }
150
151 // caller must have lock
152 func (rl *RequestLimiter) runqueue() {
153         // Handle entries from the queue as capacity permits
154         for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
155                 rl.handling++
156                 ent := rl.queue.removeMax()
157                 ent.ready <- true
158         }
159 }
160
161 // If the queue is too full, fail and remove the lowest-priority
162 // entry. Caller must have lock. Queue must not be empty.
163 func (rl *RequestLimiter) trimqueue() {
164         if len(rl.queue) <= rl.MaxQueue {
165                 return
166         }
167         min := 0
168         for i := range rl.queue {
169                 if i == 0 || rl.queue.Less(min, i) {
170                         min = i
171                 }
172         }
173         rl.queue[min].ready <- false
174         rl.queue.remove(min)
175 }
176
177 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
178         rl.mtx.Lock()
179         defer rl.mtx.Unlock()
180         qtime := time.Now()
181         var priority int64
182         if rl.Priority != nil {
183                 priority = rl.Priority(req, qtime)
184         }
185         ent := &qent{
186                 queued:   qtime,
187                 priority: priority,
188                 ready:    make(chan bool, 1),
189                 heappos:  -1,
190         }
191         if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
192                 // fast path, skip the queue
193                 rl.handling++
194                 ent.ready <- true
195                 return ent
196         }
197         if priority == math.MinInt64 {
198                 // Priority func is telling us to return 503
199                 // immediately instead of queueing, regardless of
200                 // queue size, if we can't handle the request
201                 // immediately.
202                 ent.ready <- false
203                 return ent
204         }
205         rl.queue.add(ent)
206         rl.trimqueue()
207         return ent
208 }
209
210 func (rl *RequestLimiter) remove(ent *qent) {
211         rl.mtx.Lock()
212         defer rl.mtx.Unlock()
213         if ent.heappos >= 0 {
214                 rl.queue.remove(ent.heappos)
215                 ent.ready <- false
216         }
217 }
218
219 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
220         rl.setupOnce.Do(rl.setup)
221         ent := rl.enqueue(req)
222         SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
223         var ok bool
224         select {
225         case <-req.Context().Done():
226                 rl.remove(ent)
227                 // we still need to wait for ent.ready, because
228                 // sometimes runqueue() will have already decided to
229                 // send true before our rl.remove() call, and in that
230                 // case we'll need to decrement rl.handling below.
231                 ok = <-ent.ready
232         case ok = <-ent.ready:
233         }
234         if !ok {
235                 resp.WriteHeader(http.StatusServiceUnavailable)
236                 return
237         }
238         defer func() {
239                 rl.mtx.Lock()
240                 defer rl.mtx.Unlock()
241                 rl.handling--
242                 // unblock the next waiting request
243                 rl.runqueue()
244         }()
245         rl.Handler.ServeHTTP(resp, req)
246 }