21394: use /opt/arvados-py if present
[arvados.git] / sdk / go / httpserver / request_limiter.go
index 2bb0a567305ca880b59e71dda6329fd4927592ca..1e3316ed487d17ca2eade2655ad3bfb04c8c6851 100644 (file)
@@ -5,6 +5,7 @@
 package httpserver
 
 import (
+       "container/heap"
        "math"
        "net/http"
        "sync"
@@ -14,6 +15,14 @@ import (
        "github.com/sirupsen/logrus"
 )
 
+const MinPriority = math.MinInt64
+
+// Prometheus typically polls every 10 seconds, but it doesn't cost us
+// much to also accommodate higher frequency collection by updating
+// internal stats more frequently. (This limits time resolution only
+// for the metrics that aren't generated on the fly.)
+const metricsUpdateInterval = time.Second
+
 // RequestLimiter wraps http.Handler, limiting the number of
 // concurrent requests being handled by the wrapped Handler. Requests
 // that arrive when the handler is already at the specified
@@ -25,13 +34,8 @@ import (
 type RequestLimiter struct {
        Handler http.Handler
 
-       // Maximum number of requests being handled at once. Beyond
-       // this limit, requests will be queued.
-       MaxConcurrent int
-
-       // Maximum number of requests in the queue. Beyond this limit,
-       // the lowest priority requests will return 503.
-       MaxQueue int
+       // Queue determines which queue a request is assigned to.
+       Queue func(req *http.Request) *RequestQueue
 
        // Priority determines queue ordering. Requests with higher
        // priority are handled first. Requests with equal priority
@@ -44,173 +48,180 @@ type RequestLimiter struct {
        // registered with Registry, if it is not nil.
        Registry *prometheus.Registry
 
-       setupOnce sync.Once
-       mtx       sync.Mutex
-       handling  int
-       queue     heap
+       setupOnce     sync.Once
+       mQueueDelay   *prometheus.SummaryVec
+       mQueueTimeout *prometheus.SummaryVec
+       mQueueUsage   *prometheus.GaugeVec
+       mtx           sync.Mutex
+       rqs           map[*RequestQueue]bool // all RequestQueues in use
+}
+
+type RequestQueue struct {
+       // Label for metrics. No two queues should have the same label.
+       Label string
+
+       // Maximum number of requests being handled at once. Beyond
+       // this limit, requests will be queued.
+       MaxConcurrent int
+
+       // Maximum number of requests in the queue. Beyond this limit,
+       // the lowest priority requests will return 503.
+       MaxQueue int
+
+       // Return 503 for any request for which Priority() returns
+       // MinPriority if it spends longer than this in the queue
+       // before starting processing.
+       MaxQueueTimeForMinPriority time.Duration
+
+       queue    queue
+       handling int
 }
 
 type qent struct {
+       rq       *RequestQueue
        queued   time.Time
        priority int64
        heappos  int
        ready    chan bool // true = handle now; false = return 503 now
 }
 
-type heap []*qent
+type queue []*qent
 
-func (h heap) Swap(i, j int) {
+func (h queue) Swap(i, j int) {
        h[i], h[j] = h[j], h[i]
        h[i].heappos, h[j].heappos = i, j
 }
 
-func (h heap) Less(i, j int) bool {
+func (h queue) Less(i, j int) bool {
        pi, pj := h[i].priority, h[j].priority
        return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
 }
 
-func (h heap) Len() int {
+func (h queue) Len() int {
        return len(h)
 }
 
-// Move element i to a correct position in the heap. When the heap is
-// empty, fix(0) is a no-op (does not crash).
-func (h heap) fix(i int) {
-       // If the initial position is a leaf (i.e., index is greater
-       // than the last node's parent index), we only need to move it
-       // up, not down.
-       uponly := i > (len(h)-2)/2
-       // Move the new entry toward the root until reaching a
-       // position where the parent already has higher priority.
-       for i > 0 {
-               parent := (i - 1) / 2
-               if h.Less(i, parent) {
-                       h.Swap(i, parent)
-                       i = parent
-               } else {
-                       break
-               }
-       }
-       // Move i away from the root until reaching a position where
-       // both children already have lower priority.
-       for !uponly {
-               child := i*2 + 1
-               if child+1 < len(h) && h.Less(child+1, child) {
-                       // Right child has higher priority than left
-                       // child. Choose right child.
-                       child = child + 1
-               }
-               if child < len(h) && h.Less(child, i) {
-                       // Chosen child has higher priority than i.
-                       // Swap and continue down.
-                       h.Swap(i, child)
-                       i = child
-               } else {
-                       break
-               }
-       }
-}
-
-func (h *heap) add(ent *qent) {
-       ent.heappos = len(*h)
+func (h *queue) Push(x interface{}) {
+       n := len(*h)
+       ent := x.(*qent)
+       ent.heappos = n
        *h = append(*h, ent)
-       h.fix(ent.heappos)
 }
 
-func (h *heap) removeMax() *qent {
-       ent := (*h)[0]
-       if len(*h) == 1 {
-               *h = (*h)[:0]
-       } else {
-               h.Swap(0, len(*h)-1)
-               *h = (*h)[:len(*h)-1]
-               h.fix(0)
-       }
+func (h *queue) Pop() interface{} {
+       n := len(*h)
+       ent := (*h)[n-1]
        ent.heappos = -1
+       (*h)[n-1] = nil
+       *h = (*h)[0 : n-1]
        return ent
 }
 
-func (h *heap) remove(i int) {
-       // Move the last leaf into i's place, then move it to a
-       // correct position.
-       h.Swap(i, len(*h)-1)
-       *h = (*h)[:len(*h)-1]
-       if i < len(*h) {
-               h.fix(i)
-       }
+func (h *queue) add(ent *qent) {
+       ent.heappos = h.Len()
+       h.Push(ent)
+}
+
+func (h *queue) removeMax() *qent {
+       return heap.Pop(h).(*qent)
+}
+
+func (h *queue) remove(i int) {
+       heap.Remove(h, i)
 }
 
 func (rl *RequestLimiter) setup() {
        if rl.Registry != nil {
-               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace: "arvados",
-                               Name:      "concurrent_requests",
-                               Help:      "Number of requests in progress",
-                       },
-                       func() float64 {
-                               rl.mtx.Lock()
-                               defer rl.mtx.Unlock()
-                               return float64(rl.handling)
-                       },
-               ))
-               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace: "arvados",
-                               Name:      "max_concurrent_requests",
-                               Help:      "Maximum number of concurrent requests",
-                       },
-                       func() float64 { return float64(rl.MaxConcurrent) },
-               ))
-               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace: "arvados",
-                               Name:      "queued_requests",
-                               Help:      "Number of requests in queue",
-                       },
-                       func() float64 {
+               mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Name:      "concurrent_requests",
+                       Help:      "Number of requests in progress",
+               }, []string{"queue"})
+               rl.Registry.MustRegister(mCurrentReqs)
+               mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Name:      "max_concurrent_requests",
+                       Help:      "Maximum number of concurrent requests",
+               }, []string{"queue"})
+               rl.Registry.MustRegister(mMaxReqs)
+               mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Name:      "max_queued_requests",
+                       Help:      "Maximum number of queued requests",
+               }, []string{"queue"})
+               rl.Registry.MustRegister(mMaxQueue)
+               rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Name:      "queued_requests",
+                       Help:      "Number of requests in queue",
+               }, []string{"queue", "priority"})
+               rl.Registry.MustRegister(rl.mQueueUsage)
+               rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+                       Namespace:  "arvados",
+                       Name:       "queue_delay_seconds",
+                       Help:       "Time spent in the incoming request queue before start of processing",
+                       Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+               }, []string{"queue", "priority"})
+               rl.Registry.MustRegister(rl.mQueueDelay)
+               rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+                       Namespace:  "arvados",
+                       Name:       "queue_timeout_seconds",
+                       Help:       "Time spent in the incoming request queue before client timed out or disconnected",
+                       Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+               }, []string{"queue", "priority"})
+               rl.Registry.MustRegister(rl.mQueueTimeout)
+               go func() {
+                       for range time.NewTicker(metricsUpdateInterval).C {
                                rl.mtx.Lock()
-                               defer rl.mtx.Unlock()
-                               return float64(len(rl.queue))
-                       },
-               ))
-               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace: "arvados",
-                               Name:      "max_queued_requests",
-                               Help:      "Maximum number of queued requests",
-                       },
-                       func() float64 { return float64(rl.MaxQueue) },
-               ))
+                               for rq := range rl.rqs {
+                                       var low, normal, high int
+                                       for _, ent := range rq.queue {
+                                               switch {
+                                               case ent.priority < 0:
+                                                       low++
+                                               case ent.priority > 0:
+                                                       high++
+                                               default:
+                                                       normal++
+                                               }
+                                       }
+                                       mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling))
+                                       mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent))
+                                       mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue))
+                                       rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low))
+                                       rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal))
+                                       rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high))
+                               }
+                               rl.mtx.Unlock()
+                       }
+               }()
        }
 }
 
 // caller must have lock
-func (rl *RequestLimiter) runqueue() {
+func (rq *RequestQueue) runqueue() {
        // Handle entries from the queue as capacity permits
-       for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
-               rl.handling++
-               ent := rl.queue.removeMax()
-               ent.heappos = -1
+       for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) {
+               rq.handling++
+               ent := rq.queue.removeMax()
                ent.ready <- true
        }
 }
 
 // If the queue is too full, fail and remove the lowest-priority
 // entry. Caller must have lock. Queue must not be empty.
-func (rl *RequestLimiter) trimqueue() {
-       if len(rl.queue) <= rl.MaxQueue {
+func (rq *RequestQueue) trimqueue() {
+       if len(rq.queue) <= rq.MaxQueue {
                return
        }
        min := 0
-       for i := range rl.queue {
-               if i == 0 || rl.queue.Less(min, i) {
+       for i := range rq.queue {
+               if i == 0 || rq.queue.Less(min, i) {
                        min = i
                }
        }
-       rl.queue[min].heappos = -1
-       rl.queue[min].ready <- false
-       rl.queue.remove(min)
+       rq.queue[min].ready <- false
+       rq.queue.remove(min)
 }
 
 func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
@@ -222,27 +233,24 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
                priority = rl.Priority(req, qtime)
        }
        ent := &qent{
+               rq:       rl.Queue(req),
                queued:   qtime,
                priority: priority,
                ready:    make(chan bool, 1),
                heappos:  -1,
        }
-       if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
+       if rl.rqs == nil {
+               rl.rqs = map[*RequestQueue]bool{}
+       }
+       rl.rqs[ent.rq] = true
+       if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling {
                // fast path, skip the queue
-               rl.handling++
+               ent.rq.handling++
                ent.ready <- true
                return ent
        }
-       if priority == math.MinInt64 {
-               // Priority func is telling us to return 503
-               // immediately instead of queueing, regardless of
-               // queue size, if we can't handle the request
-               // immediately.
-               ent.ready <- false
-               return ent
-       }
-       rl.queue.add(ent)
-       rl.trimqueue()
+       ent.rq.queue.add(ent)
+       ent.rq.trimqueue()
        return ent
 }
 
@@ -250,8 +258,7 @@ func (rl *RequestLimiter) remove(ent *qent) {
        rl.mtx.Lock()
        defer rl.mtx.Unlock()
        if ent.heappos >= 0 {
-               rl.queue.remove(ent.heappos)
-               ent.heappos = -1
+               ent.rq.queue.remove(ent.heappos)
                ent.ready <- false
        }
 }
@@ -259,7 +266,17 @@ func (rl *RequestLimiter) remove(ent *qent) {
 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        rl.setupOnce.Do(rl.setup)
        ent := rl.enqueue(req)
-       SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+       SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label})
+       if ent.priority == MinPriority {
+               // Note that MaxQueueTime==0 does not cancel a req
+               // that skips the queue, because in that case
+               // rl.enqueue() has already fired ready<-true and
+               // rl.remove() is a no-op.
+               go func() {
+                       time.Sleep(ent.rq.MaxQueueTimeForMinPriority)
+                       rl.remove(ent)
+               }()
+       }
        var ok bool
        select {
        case <-req.Context().Done():
@@ -267,10 +284,34 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
                // we still need to wait for ent.ready, because
                // sometimes runqueue() will have already decided to
                // send true before our rl.remove() call, and in that
-               // case we'll need to decrement rl.handling below.
+               // case we'll need to decrement ent.rq.handling below.
                ok = <-ent.ready
        case ok = <-ent.ready:
        }
+
+       // Report time spent in queue in the appropriate bucket:
+       // mQueueDelay if the request actually got processed,
+       // mQueueTimeout if it was abandoned or cancelled before
+       // getting a processing slot.
+       var series *prometheus.SummaryVec
+       if ok {
+               series = rl.mQueueDelay
+       } else {
+               series = rl.mQueueTimeout
+       }
+       if series != nil {
+               var qlabel string
+               switch {
+               case ent.priority < 0:
+                       qlabel = "low"
+               case ent.priority > 0:
+                       qlabel = "high"
+               default:
+                       qlabel = "normal"
+               }
+               series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+       }
+
        if !ok {
                resp.WriteHeader(http.StatusServiceUnavailable)
                return
@@ -278,9 +319,9 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
        defer func() {
                rl.mtx.Lock()
                defer rl.mtx.Unlock()
-               rl.handling--
+               ent.rq.handling--
                // unblock the next waiting request
-               rl.runqueue()
+               ent.rq.runqueue()
        }()
        rl.Handler.ServeHTTP(resp, req)
 }