20831: Fix tests
[arvados.git] / sdk / go / httpserver / request_limiter.go
index f9f94ff987e2b23feafdee0c38d0abbe705ad0a4..9d501ab0ebfa7db908a2886d4b208973c8606863 100644 (file)
@@ -15,7 +15,13 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
+
+// Prometheus typically polls every 10 seconds, but it doesn't cost us
+// much to also accommodate higher frequency collection by updating
+// internal stats more frequently. (This limits time resolution only
+// for the metrics that aren't generated on the fly.)
+const metricsUpdateInterval = time.Second
 
 // RequestLimiter wraps http.Handler, limiting the number of
 // concurrent requests being handled by the wrapped Handler. Requests
@@ -42,15 +48,23 @@ type RequestLimiter struct {
        // handled FIFO.
        Priority func(req *http.Request, queued time.Time) int64
 
+       // Return 503 for any request for which Priority() returns
+       // MinPriority if it spends longer than this in the queue
+       // before starting processing.
+       MaxQueueTimeForMinPriority time.Duration
+
        // "concurrent_requests", "max_concurrent_requests",
        // "queued_requests", and "max_queued_requests" metrics are
        // registered with Registry, if it is not nil.
        Registry *prometheus.Registry
 
-       setupOnce sync.Once
-       mtx       sync.Mutex
-       handling  int
-       queue     queue
+       setupOnce     sync.Once
+       mQueueDelay   *prometheus.SummaryVec
+       mQueueTimeout *prometheus.SummaryVec
+       mQueueUsage   *prometheus.GaugeVec
+       mtx           sync.Mutex
+       handling      int
+       queue         queue
 }
 
 type qent struct {
@@ -127,18 +141,12 @@ func (rl *RequestLimiter) setup() {
                        },
                        func() float64 { return float64(rl.MaxConcurrent) },
                ))
-               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
-                       prometheus.GaugeOpts{
-                               Namespace: "arvados",
-                               Name:      "queued_requests",
-                               Help:      "Number of requests in queue",
-                       },
-                       func() float64 {
-                               rl.mtx.Lock()
-                               defer rl.mtx.Unlock()
-                               return float64(len(rl.queue))
-                       },
-               ))
+               rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Name:      "queued_requests",
+                       Help:      "Number of requests in queue",
+               }, []string{"priority"})
+               rl.Registry.MustRegister(rl.mQueueUsage)
                rl.Registry.MustRegister(prometheus.NewGaugeFunc(
                        prometheus.GaugeOpts{
                                Namespace: "arvados",
@@ -147,13 +155,40 @@ func (rl *RequestLimiter) setup() {
                        },
                        func() float64 { return float64(rl.MaxQueue) },
                ))
-               rl.mQueueDelay = prometheus.NewSummary(prometheus.SummaryOpts{
+               rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
                        Namespace:  "arvados",
                        Name:       "queue_delay_seconds",
-                       Help:       "Number of seconds spent in the incoming request queue",
+                       Help:       "Time spent in the incoming request queue before start of processing",
                        Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
-               })
-               reg.MustRegister(rl.mQueueDelay)
+               }, []string{"priority"})
+               rl.Registry.MustRegister(rl.mQueueDelay)
+               rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+                       Namespace:  "arvados",
+                       Name:       "queue_timeout_seconds",
+                       Help:       "Time spent in the incoming request queue before client timed out or disconnected",
+                       Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+               }, []string{"priority"})
+               rl.Registry.MustRegister(rl.mQueueTimeout)
+               go func() {
+                       for range time.NewTicker(metricsUpdateInterval).C {
+                               var low, normal, high int
+                               rl.mtx.Lock()
+                               for _, ent := range rl.queue {
+                                       switch {
+                                       case ent.priority < 0:
+                                               low++
+                                       case ent.priority > 0:
+                                               high++
+                                       default:
+                                               normal++
+                                       }
+                               }
+                               rl.mtx.Unlock()
+                               rl.mQueueUsage.WithLabelValues("low").Set(float64(low))
+                               rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal))
+                               rl.mQueueUsage.WithLabelValues("high").Set(float64(high))
+                       }
+               }()
        }
 }
 
@@ -203,14 +238,6 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
                ent.ready <- true
                return ent
        }
-       if priority == IneligibleForQueuePriority {
-               // Priority func is telling us to return 503
-               // immediately instead of queueing, regardless of
-               // queue size, if we can't handle the request
-               // immediately.
-               ent.ready <- false
-               return ent
-       }
        rl.queue.add(ent)
        rl.trimqueue()
        return ent
@@ -229,6 +256,16 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
        rl.setupOnce.Do(rl.setup)
        ent := rl.enqueue(req)
        SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+       if ent.priority == MinPriority {
+               // Note that MaxQueueTime==0 does not cancel a req
+               // that skips the queue, because in that case
+               // rl.enqueue() has already fired ready<-true and
+               // rl.remove() is a no-op.
+               go func() {
+                       time.Sleep(rl.MaxQueueTimeForMinPriority)
+                       rl.remove(ent)
+               }()
+       }
        var ok bool
        select {
        case <-req.Context().Done():
@@ -240,6 +277,30 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
                ok = <-ent.ready
        case ok = <-ent.ready:
        }
+
+       // Report time spent in queue in the appropriate bucket:
+       // mQueueDelay if the request actually got processed,
+       // mQueueTimeout if it was abandoned or cancelled before
+       // getting a processing slot.
+       var series *prometheus.SummaryVec
+       if ok {
+               series = rl.mQueueDelay
+       } else {
+               series = rl.mQueueTimeout
+       }
+       if series != nil {
+               var qlabel string
+               switch {
+               case ent.priority < 0:
+                       qlabel = "low"
+               case ent.priority > 0:
+                       qlabel = "high"
+               default:
+                       qlabel = "normal"
+               }
+               series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+       }
+
        if !ok {
                resp.WriteHeader(http.StatusServiceUnavailable)
                return