package httpserver
import (
+ "container/heap"
"math"
"net/http"
"sync"
"github.com/sirupsen/logrus"
)
+const MinPriority = math.MinInt64
+
+// Prometheus typically polls every 10 seconds, but it doesn't cost us
+// much to also accommodate higher frequency collection by updating
+// internal stats more frequently. (This limits time resolution only
+// for the metrics that aren't generated on the fly.)
+const metricsUpdateInterval = time.Second
+
// RequestLimiter wraps http.Handler, limiting the number of
// concurrent requests being handled by the wrapped Handler. Requests
// that arrive when the handler is already at the specified
type RequestLimiter struct {
Handler http.Handler
- // Maximum number of requests being handled at once. Beyond
- // this limit, requests will be queued.
- MaxConcurrent int
-
- // Maximum number of requests in the queue. Beyond this limit,
- // the lowest priority requests will return 503.
- MaxQueue int
+ // Queue determines which queue a request is assigned to.
+ Queue func(req *http.Request) *RequestQueue
// Priority determines queue ordering. Requests with higher
// priority are handled first. Requests with equal priority
// registered with Registry, if it is not nil.
Registry *prometheus.Registry
- setupOnce sync.Once
- mtx sync.Mutex
- handling int
- queue heap
+ setupOnce sync.Once
+ mQueueDelay *prometheus.SummaryVec
+ mQueueTimeout *prometheus.SummaryVec
+ mQueueUsage *prometheus.GaugeVec
+ mtx sync.Mutex
+ rqs map[*RequestQueue]bool // all RequestQueues in use
+}
+
+type RequestQueue struct {
+ // Label for metrics. No two queues should have the same label.
+ Label string
+
+ // Maximum number of requests being handled at once. Beyond
+ // this limit, requests will be queued.
+ MaxConcurrent int
+
+ // Maximum number of requests in the queue. Beyond this limit,
+ // the lowest priority requests will return 503.
+ MaxQueue int
+
+ // Return 503 for any request for which Priority() returns
+ // MinPriority if it spends longer than this in the queue
+ // before starting processing.
+ MaxQueueTimeForMinPriority time.Duration
+
+ queue queue
+ handling int
}
type qent struct {
+ rq *RequestQueue
queued time.Time
priority int64
heappos int
ready chan bool // true = handle now; false = return 503 now
}
-type heap []*qent
+type queue []*qent
-func (h heap) Swap(i, j int) {
+func (h queue) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].heappos, h[j].heappos = i, j
}
-func (h heap) Less(i, j int) bool {
+func (h queue) Less(i, j int) bool {
pi, pj := h[i].priority, h[j].priority
return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
}
-func (h heap) Len() int {
+func (h queue) Len() int {
return len(h)
}
-// Move element i to a correct position in the heap. When the heap is
-// empty, fix(0) is a no-op (does not crash).
-func (h heap) fix(i int) {
- // If the initial position is a leaf (i.e., index is greater
- // than the last node's parent index), we only need to move it
- // up, not down.
- uponly := i > (len(h)-2)/2
- // Move the new entry toward the root until reaching a
- // position where the parent already has higher priority.
- for i > 0 {
- parent := (i - 1) / 2
- if h.Less(i, parent) {
- h.Swap(i, parent)
- i = parent
- } else {
- break
- }
- }
- // Move i away from the root until reaching a position where
- // both children already have lower priority.
- for !uponly {
- child := i*2 + 1
- if child+1 < len(h) && h.Less(child+1, child) {
- // Right child has higher priority than left
- // child. Choose right child.
- child = child + 1
- }
- if child < len(h) && h.Less(child, i) {
- // Chosen child has higher priority than i.
- // Swap and continue down.
- h.Swap(i, child)
- i = child
- } else {
- break
- }
- }
-}
-
-func (h *heap) add(ent *qent) {
- ent.heappos = len(*h)
+func (h *queue) Push(x interface{}) {
+ n := len(*h)
+ ent := x.(*qent)
+ ent.heappos = n
*h = append(*h, ent)
- h.fix(ent.heappos)
}
-func (h *heap) removeMax() *qent {
- ent := (*h)[0]
- if len(*h) == 1 {
- *h = (*h)[:0]
- } else {
- h.Swap(0, len(*h)-1)
- *h = (*h)[:len(*h)-1]
- h.fix(0)
- }
+func (h *queue) Pop() interface{} {
+ n := len(*h)
+ ent := (*h)[n-1]
ent.heappos = -1
+ (*h)[n-1] = nil
+ *h = (*h)[0 : n-1]
return ent
}
-func (h *heap) remove(i int) {
- // Move the last leaf into i's place, then move it to a
- // correct position.
- h.Swap(i, len(*h)-1)
- *h = (*h)[:len(*h)-1]
- if i < len(*h) {
- h.fix(i)
- }
+func (h *queue) add(ent *qent) {
+ ent.heappos = h.Len()
+ h.Push(ent)
+}
+
+func (h *queue) removeMax() *qent {
+ return heap.Pop(h).(*qent)
+}
+
+func (h *queue) remove(i int) {
+ heap.Remove(h, i)
}
func (rl *RequestLimiter) setup() {
if rl.Registry != nil {
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "concurrent_requests",
- Help: "Number of requests in progress",
- },
- func() float64 {
- rl.mtx.Lock()
- defer rl.mtx.Unlock()
- return float64(rl.handling)
- },
- ))
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "max_concurrent_requests",
- Help: "Maximum number of concurrent requests",
- },
- func() float64 { return float64(rl.MaxConcurrent) },
- ))
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "queued_requests",
- Help: "Number of requests in queue",
- },
- func() float64 {
+ mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "concurrent_requests",
+ Help: "Number of requests in progress",
+ }, []string{"queue"})
+ rl.Registry.MustRegister(mCurrentReqs)
+ mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_concurrent_requests",
+ Help: "Maximum number of concurrent requests",
+ }, []string{"queue"})
+ rl.Registry.MustRegister(mMaxReqs)
+ mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_queued_requests",
+ Help: "Maximum number of queued requests",
+ }, []string{"queue"})
+ rl.Registry.MustRegister(mMaxQueue)
+ rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "queued_requests",
+ Help: "Number of requests in queue",
+ }, []string{"queue", "priority"})
+ rl.Registry.MustRegister(rl.mQueueUsage)
+ rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Name: "queue_delay_seconds",
+ Help: "Time spent in the incoming request queue before start of processing",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ }, []string{"queue", "priority"})
+ rl.Registry.MustRegister(rl.mQueueDelay)
+ rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Name: "queue_timeout_seconds",
+ Help: "Time spent in the incoming request queue before client timed out or disconnected",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ }, []string{"queue", "priority"})
+ rl.Registry.MustRegister(rl.mQueueTimeout)
+ go func() {
+ for range time.NewTicker(metricsUpdateInterval).C {
rl.mtx.Lock()
- defer rl.mtx.Unlock()
- return float64(len(rl.queue))
- },
- ))
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "max_queued_requests",
- Help: "Maximum number of queued requests",
- },
- func() float64 { return float64(rl.MaxQueue) },
- ))
+ for rq := range rl.rqs {
+ var low, normal, high int
+ for _, ent := range rq.queue {
+ switch {
+ case ent.priority < 0:
+ low++
+ case ent.priority > 0:
+ high++
+ default:
+ normal++
+ }
+ }
+ mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling))
+ mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent))
+ mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue))
+ rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low))
+ rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal))
+ rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high))
+ }
+ rl.mtx.Unlock()
+ }
+ }()
}
}
// caller must have lock
-func (rl *RequestLimiter) runqueue() {
+func (rq *RequestQueue) runqueue() {
// Handle entries from the queue as capacity permits
- for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
- rl.handling++
- ent := rl.queue.removeMax()
- ent.heappos = -1
+ for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) {
+ rq.handling++
+ ent := rq.queue.removeMax()
ent.ready <- true
}
}
// If the queue is too full, fail and remove the lowest-priority
// entry. Caller must have lock. Queue must not be empty.
-func (rl *RequestLimiter) trimqueue() {
- if len(rl.queue) <= rl.MaxQueue {
+func (rq *RequestQueue) trimqueue() {
+ if len(rq.queue) <= rq.MaxQueue {
return
}
min := 0
- for i := range rl.queue {
- if i == 0 || rl.queue.Less(min, i) {
+ for i := range rq.queue {
+ if i == 0 || rq.queue.Less(min, i) {
min = i
}
}
- rl.queue[min].heappos = -1
- rl.queue[min].ready <- false
- rl.queue.remove(min)
+ rq.queue[min].ready <- false
+ rq.queue.remove(min)
}
func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
priority = rl.Priority(req, qtime)
}
ent := &qent{
+ rq: rl.Queue(req),
queued: qtime,
priority: priority,
ready: make(chan bool, 1),
heappos: -1,
}
- if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
+ if rl.rqs == nil {
+ rl.rqs = map[*RequestQueue]bool{}
+ }
+ rl.rqs[ent.rq] = true
+ if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling {
// fast path, skip the queue
- rl.handling++
+ ent.rq.handling++
ent.ready <- true
return ent
}
- if priority == math.MinInt64 {
- // Priority func is telling us to return 503
- // immediately instead of queueing, regardless of
- // queue size, if we can't handle the request
- // immediately.
- ent.ready <- false
- return ent
- }
- rl.queue.add(ent)
- rl.trimqueue()
+ ent.rq.queue.add(ent)
+ ent.rq.trimqueue()
return ent
}
rl.mtx.Lock()
defer rl.mtx.Unlock()
if ent.heappos >= 0 {
- rl.queue.remove(ent.heappos)
- ent.heappos = -1
+ ent.rq.queue.remove(ent.heappos)
ent.ready <- false
}
}
func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rl.setupOnce.Do(rl.setup)
ent := rl.enqueue(req)
- SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+ SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label})
+ if ent.priority == MinPriority {
+ // Note that MaxQueueTime==0 does not cancel a req
+ // that skips the queue, because in that case
+ // rl.enqueue() has already fired ready<-true and
+ // rl.remove() is a no-op.
+ go func() {
+ time.Sleep(ent.rq.MaxQueueTimeForMinPriority)
+ rl.remove(ent)
+ }()
+ }
var ok bool
select {
case <-req.Context().Done():
// we still need to wait for ent.ready, because
// sometimes runqueue() will have already decided to
// send true before our rl.remove() call, and in that
- // case we'll need to decrement rl.handling below.
+ // case we'll need to decrement ent.rq.handling below.
ok = <-ent.ready
case ok = <-ent.ready:
}
+
+ // Report time spent in queue in the appropriate bucket:
+ // mQueueDelay if the request actually got processed,
+ // mQueueTimeout if it was abandoned or cancelled before
+ // getting a processing slot.
+ var series *prometheus.SummaryVec
+ if ok {
+ series = rl.mQueueDelay
+ } else {
+ series = rl.mQueueTimeout
+ }
+ if series != nil {
+ var qlabel string
+ switch {
+ case ent.priority < 0:
+ qlabel = "low"
+ case ent.priority > 0:
+ qlabel = "high"
+ default:
+ qlabel = "normal"
+ }
+ series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+ }
+
if !ok {
resp.WriteHeader(http.StatusServiceUnavailable)
return
defer func() {
rl.mtx.Lock()
defer rl.mtx.Unlock()
- rl.handling--
+ ent.rq.handling--
// unblock the next waiting request
- rl.runqueue()
+ ent.rq.runqueue()
}()
rl.Handler.ServeHTTP(resp, req)
}