X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/032958b6a12030fff3151784f6970842281ca076..99758c2727edaba1f3931822c94354d94c89396a:/sdk/go/httpserver/request_limiter.go diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go index f9f94ff987..9d501ab0eb 100644 --- a/sdk/go/httpserver/request_limiter.go +++ b/sdk/go/httpserver/request_limiter.go @@ -15,7 +15,13 @@ import ( "github.com/sirupsen/logrus" ) -const IneligibleForQueuePriority = math.MinInt64 +const MinPriority = math.MinInt64 + +// Prometheus typically polls every 10 seconds, but it doesn't cost us +// much to also accommodate higher frequency collection by updating +// internal stats more frequently. (This limits time resolution only +// for the metrics that aren't generated on the fly.) +const metricsUpdateInterval = time.Second // RequestLimiter wraps http.Handler, limiting the number of // concurrent requests being handled by the wrapped Handler. Requests @@ -42,15 +48,23 @@ type RequestLimiter struct { // handled FIFO. Priority func(req *http.Request, queued time.Time) int64 + // Return 503 for any request for which Priority() returns + // MinPriority if it spends longer than this in the queue + // before starting processing. + MaxQueueTimeForMinPriority time.Duration + // "concurrent_requests", "max_concurrent_requests", // "queued_requests", and "max_queued_requests" metrics are // registered with Registry, if it is not nil. Registry *prometheus.Registry - setupOnce sync.Once - mtx sync.Mutex - handling int - queue queue + setupOnce sync.Once + mQueueDelay *prometheus.SummaryVec + mQueueTimeout *prometheus.SummaryVec + mQueueUsage *prometheus.GaugeVec + mtx sync.Mutex + handling int + queue queue } type qent struct { @@ -127,18 +141,12 @@ func (rl *RequestLimiter) setup() { }, func() float64 { return float64(rl.MaxConcurrent) }, )) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "queued_requests", - Help: "Number of requests in queue", - }, - func() float64 { - rl.mtx.Lock() - defer rl.mtx.Unlock() - return float64(len(rl.queue)) - }, - )) + rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "queued_requests", + Help: "Number of requests in queue", + }, []string{"priority"}) + rl.Registry.MustRegister(rl.mQueueUsage) rl.Registry.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "arvados", @@ -147,13 +155,40 @@ func (rl *RequestLimiter) setup() { }, func() float64 { return float64(rl.MaxQueue) }, )) - rl.mQueueDelay = prometheus.NewSummary(prometheus.SummaryOpts{ + rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_delay_seconds", - Help: "Number of seconds spent in the incoming request queue", + Help: "Time spent in the incoming request queue before start of processing", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }) - reg.MustRegister(rl.mQueueDelay) + }, []string{"priority"}) + rl.Registry.MustRegister(rl.mQueueDelay) + rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "arvados", + Name: "queue_timeout_seconds", + Help: "Time spent in the incoming request queue before client timed out or disconnected", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, []string{"priority"}) + rl.Registry.MustRegister(rl.mQueueTimeout) + go func() { + for range time.NewTicker(metricsUpdateInterval).C { + var low, normal, high int + rl.mtx.Lock() + for _, ent := range rl.queue { + switch { + case ent.priority < 0: + low++ + case ent.priority > 0: + high++ + default: + normal++ + } + } + rl.mtx.Unlock() + rl.mQueueUsage.WithLabelValues("low").Set(float64(low)) + rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal)) + rl.mQueueUsage.WithLabelValues("high").Set(float64(high)) + } + }() } } @@ -203,14 +238,6 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent { ent.ready <- true return ent } - if priority == IneligibleForQueuePriority { - // Priority func is telling us to return 503 - // immediately instead of queueing, regardless of - // queue size, if we can't handle the request - // immediately. - ent.ready <- false - return ent - } rl.queue.add(ent) rl.trimqueue() return ent @@ -229,6 +256,16 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) rl.setupOnce.Do(rl.setup) ent := rl.enqueue(req) SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority}) + if ent.priority == MinPriority { + // Note that MaxQueueTime==0 does not cancel a req + // that skips the queue, because in that case + // rl.enqueue() has already fired ready<-true and + // rl.remove() is a no-op. + go func() { + time.Sleep(rl.MaxQueueTimeForMinPriority) + rl.remove(ent) + }() + } var ok bool select { case <-req.Context().Done(): @@ -240,6 +277,30 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) ok = <-ent.ready case ok = <-ent.ready: } + + // Report time spent in queue in the appropriate bucket: + // mQueueDelay if the request actually got processed, + // mQueueTimeout if it was abandoned or cancelled before + // getting a processing slot. + var series *prometheus.SummaryVec + if ok { + series = rl.mQueueDelay + } else { + series = rl.mQueueTimeout + } + if series != nil { + var qlabel string + switch { + case ent.priority < 0: + qlabel = "low" + case ent.priority > 0: + qlabel = "high" + default: + qlabel = "normal" + } + series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) + } + if !ok { resp.WriteHeader(http.StatusServiceUnavailable) return