X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/557264f16adb0a02f733c296baf507e3e7d98478..99758c2727edaba1f3931822c94354d94c89396a:/sdk/go/httpserver/request_limiter.go diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go index 724a26fa25..9d501ab0eb 100644 --- a/sdk/go/httpserver/request_limiter.go +++ b/sdk/go/httpserver/request_limiter.go @@ -15,7 +15,13 @@ import ( "github.com/sirupsen/logrus" ) -const IneligibleForQueuePriority = math.MinInt64 +const MinPriority = math.MinInt64 + +// Prometheus typically polls every 10 seconds, but it doesn't cost us +// much to also accommodate higher frequency collection by updating +// internal stats more frequently. (This limits time resolution only +// for the metrics that aren't generated on the fly.) +const metricsUpdateInterval = time.Second // RequestLimiter wraps http.Handler, limiting the number of // concurrent requests being handled by the wrapped Handler. Requests @@ -42,16 +48,23 @@ type RequestLimiter struct { // handled FIFO. Priority func(req *http.Request, queued time.Time) int64 + // Return 503 for any request for which Priority() returns + // MinPriority if it spends longer than this in the queue + // before starting processing. + MaxQueueTimeForMinPriority time.Duration + // "concurrent_requests", "max_concurrent_requests", // "queued_requests", and "max_queued_requests" metrics are // registered with Registry, if it is not nil. Registry *prometheus.Registry - setupOnce sync.Once - mQueueDelay *prometheus.SummaryVec - mtx sync.Mutex - handling int - queue queue + setupOnce sync.Once + mQueueDelay *prometheus.SummaryVec + mQueueTimeout *prometheus.SummaryVec + mQueueUsage *prometheus.GaugeVec + mtx sync.Mutex + handling int + queue queue } type qent struct { @@ -128,18 +141,12 @@ func (rl *RequestLimiter) setup() { }, func() float64 { return float64(rl.MaxConcurrent) }, )) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "queued_requests", - Help: "Number of requests in queue", - }, - func() float64 { - rl.mtx.Lock() - defer rl.mtx.Unlock() - return float64(len(rl.queue)) - }, - )) + rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "queued_requests", + Help: "Number of requests in queue", + }, []string{"priority"}) + rl.Registry.MustRegister(rl.mQueueUsage) rl.Registry.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "arvados", @@ -151,10 +158,37 @@ func (rl *RequestLimiter) setup() { rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_delay_seconds", - Help: "Time spent in the incoming request queue", + Help: "Time spent in the incoming request queue before start of processing", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, }, []string{"priority"}) rl.Registry.MustRegister(rl.mQueueDelay) + rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "arvados", + Name: "queue_timeout_seconds", + Help: "Time spent in the incoming request queue before client timed out or disconnected", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, []string{"priority"}) + rl.Registry.MustRegister(rl.mQueueTimeout) + go func() { + for range time.NewTicker(metricsUpdateInterval).C { + var low, normal, high int + rl.mtx.Lock() + for _, ent := range rl.queue { + switch { + case ent.priority < 0: + low++ + case ent.priority > 0: + high++ + default: + normal++ + } + } + rl.mtx.Unlock() + rl.mQueueUsage.WithLabelValues("low").Set(float64(low)) + rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal)) + rl.mQueueUsage.WithLabelValues("high").Set(float64(high)) + } + }() } } @@ -204,14 +238,6 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent { ent.ready <- true return ent } - if priority == IneligibleForQueuePriority { - // Priority func is telling us to return 503 - // immediately instead of queueing, regardless of - // queue size, if we can't handle the request - // immediately. - ent.ready <- false - return ent - } rl.queue.add(ent) rl.trimqueue() return ent @@ -230,11 +256,19 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) rl.setupOnce.Do(rl.setup) ent := rl.enqueue(req) SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority}) + if ent.priority == MinPriority { + // Note that MaxQueueTime==0 does not cancel a req + // that skips the queue, because in that case + // rl.enqueue() has already fired ready<-true and + // rl.remove() is a no-op. + go func() { + time.Sleep(rl.MaxQueueTimeForMinPriority) + rl.remove(ent) + }() + } var ok bool - var abandoned bool select { case <-req.Context().Done(): - abandoned = true rl.remove(ent) // we still need to wait for ent.ready, because // sometimes runqueue() will have already decided to @@ -244,21 +278,27 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) case ok = <-ent.ready: } - // report time spent in queue - var qlabel string - switch { - case abandoned: - case !ok && ent.priority == IneligibleForQueuePriority: - // don't pollute stats - case ent.priority < 0: - qlabel = "low" - case ent.priority > 0: - qlabel = "high" - default: - qlabel = "normal" + // Report time spent in queue in the appropriate bucket: + // mQueueDelay if the request actually got processed, + // mQueueTimeout if it was abandoned or cancelled before + // getting a processing slot. + var series *prometheus.SummaryVec + if ok { + series = rl.mQueueDelay + } else { + series = rl.mQueueTimeout } - if qlabel != "" && rl.mQueueDelay != nil { - rl.mQueueDelay.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) + if series != nil { + var qlabel string + switch { + case ent.priority < 0: + qlabel = "low" + case ent.priority > 0: + qlabel = "high" + default: + qlabel = "normal" + } + series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) } if !ok {