X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7cd9a0c1fa550f68fbda77c672891200fc85dcb8..4179707fc19db62fd496b54283258cb89e08f78f:/sdk/go/httpserver/request_limiter.go diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go index 9d501ab0eb..1e3316ed48 100644 --- a/sdk/go/httpserver/request_limiter.go +++ b/sdk/go/httpserver/request_limiter.go @@ -34,13 +34,8 @@ const metricsUpdateInterval = time.Second type RequestLimiter struct { Handler http.Handler - // Maximum number of requests being handled at once. Beyond - // this limit, requests will be queued. - MaxConcurrent int - - // Maximum number of requests in the queue. Beyond this limit, - // the lowest priority requests will return 503. - MaxQueue int + // Queue determines which queue a request is assigned to. + Queue func(req *http.Request) *RequestQueue // Priority determines queue ordering. Requests with higher // priority are handled first. Requests with equal priority @@ -48,11 +43,6 @@ type RequestLimiter struct { // handled FIFO. Priority func(req *http.Request, queued time.Time) int64 - // Return 503 for any request for which Priority() returns - // MinPriority if it spends longer than this in the queue - // before starting processing. - MaxQueueTimeForMinPriority time.Duration - // "concurrent_requests", "max_concurrent_requests", // "queued_requests", and "max_queued_requests" metrics are // registered with Registry, if it is not nil. @@ -63,11 +53,32 @@ type RequestLimiter struct { mQueueTimeout *prometheus.SummaryVec mQueueUsage *prometheus.GaugeVec mtx sync.Mutex - handling int - queue queue + rqs map[*RequestQueue]bool // all RequestQueues in use +} + +type RequestQueue struct { + // Label for metrics. No two queues should have the same label. + Label string + + // Maximum number of requests being handled at once. Beyond + // this limit, requests will be queued. + MaxConcurrent int + + // Maximum number of requests in the queue. Beyond this limit, + // the lowest priority requests will return 503. + MaxQueue int + + // Return 503 for any request for which Priority() returns + // MinPriority if it spends longer than this in the queue + // before starting processing. + MaxQueueTimeForMinPriority time.Duration + + queue queue + handling int } type qent struct { + rq *RequestQueue queued time.Time priority int64 heappos int @@ -121,101 +132,96 @@ func (h *queue) remove(i int) { func (rl *RequestLimiter) setup() { if rl.Registry != nil { - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "concurrent_requests", - Help: "Number of requests in progress", - }, - func() float64 { - rl.mtx.Lock() - defer rl.mtx.Unlock() - return float64(rl.handling) - }, - )) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "max_concurrent_requests", - Help: "Maximum number of concurrent requests", - }, - func() float64 { return float64(rl.MaxConcurrent) }, - )) + mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "concurrent_requests", + Help: "Number of requests in progress", + }, []string{"queue"}) + rl.Registry.MustRegister(mCurrentReqs) + mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_concurrent_requests", + Help: "Maximum number of concurrent requests", + }, []string{"queue"}) + rl.Registry.MustRegister(mMaxReqs) + mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_queued_requests", + Help: "Maximum number of queued requests", + }, []string{"queue"}) + rl.Registry.MustRegister(mMaxQueue) rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Name: "queued_requests", Help: "Number of requests in queue", - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueUsage) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "max_queued_requests", - Help: "Maximum number of queued requests", - }, - func() float64 { return float64(rl.MaxQueue) }, - )) rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_delay_seconds", Help: "Time spent in the incoming request queue before start of processing", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueDelay) rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_timeout_seconds", Help: "Time spent in the incoming request queue before client timed out or disconnected", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueTimeout) go func() { for range time.NewTicker(metricsUpdateInterval).C { - var low, normal, high int rl.mtx.Lock() - for _, ent := range rl.queue { - switch { - case ent.priority < 0: - low++ - case ent.priority > 0: - high++ - default: - normal++ + for rq := range rl.rqs { + var low, normal, high int + for _, ent := range rq.queue { + switch { + case ent.priority < 0: + low++ + case ent.priority > 0: + high++ + default: + normal++ + } } + mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling)) + mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent)) + mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue)) + rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low)) + rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal)) + rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high)) } rl.mtx.Unlock() - rl.mQueueUsage.WithLabelValues("low").Set(float64(low)) - rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal)) - rl.mQueueUsage.WithLabelValues("high").Set(float64(high)) } }() } } // caller must have lock -func (rl *RequestLimiter) runqueue() { +func (rq *RequestQueue) runqueue() { // Handle entries from the queue as capacity permits - for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) { - rl.handling++ - ent := rl.queue.removeMax() + for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) { + rq.handling++ + ent := rq.queue.removeMax() ent.ready <- true } } // If the queue is too full, fail and remove the lowest-priority // entry. Caller must have lock. Queue must not be empty. -func (rl *RequestLimiter) trimqueue() { - if len(rl.queue) <= rl.MaxQueue { +func (rq *RequestQueue) trimqueue() { + if len(rq.queue) <= rq.MaxQueue { return } min := 0 - for i := range rl.queue { - if i == 0 || rl.queue.Less(min, i) { + for i := range rq.queue { + if i == 0 || rq.queue.Less(min, i) { min = i } } - rl.queue[min].ready <- false - rl.queue.remove(min) + rq.queue[min].ready <- false + rq.queue.remove(min) } func (rl *RequestLimiter) enqueue(req *http.Request) *qent { @@ -227,19 +233,24 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent { priority = rl.Priority(req, qtime) } ent := &qent{ + rq: rl.Queue(req), queued: qtime, priority: priority, ready: make(chan bool, 1), heappos: -1, } - if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling { + if rl.rqs == nil { + rl.rqs = map[*RequestQueue]bool{} + } + rl.rqs[ent.rq] = true + if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling { // fast path, skip the queue - rl.handling++ + ent.rq.handling++ ent.ready <- true return ent } - rl.queue.add(ent) - rl.trimqueue() + ent.rq.queue.add(ent) + ent.rq.trimqueue() return ent } @@ -247,7 +258,7 @@ func (rl *RequestLimiter) remove(ent *qent) { rl.mtx.Lock() defer rl.mtx.Unlock() if ent.heappos >= 0 { - rl.queue.remove(ent.heappos) + ent.rq.queue.remove(ent.heappos) ent.ready <- false } } @@ -255,14 +266,14 @@ func (rl *RequestLimiter) remove(ent *qent) { func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) { rl.setupOnce.Do(rl.setup) ent := rl.enqueue(req) - SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority}) + SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label}) if ent.priority == MinPriority { // Note that MaxQueueTime==0 does not cancel a req // that skips the queue, because in that case // rl.enqueue() has already fired ready<-true and // rl.remove() is a no-op. go func() { - time.Sleep(rl.MaxQueueTimeForMinPriority) + time.Sleep(ent.rq.MaxQueueTimeForMinPriority) rl.remove(ent) }() } @@ -273,7 +284,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) // we still need to wait for ent.ready, because // sometimes runqueue() will have already decided to // send true before our rl.remove() call, and in that - // case we'll need to decrement rl.handling below. + // case we'll need to decrement ent.rq.handling below. ok = <-ent.ready case ok = <-ent.ready: } @@ -298,7 +309,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) default: qlabel = "normal" } - series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) + series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) } if !ok { @@ -308,9 +319,9 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) defer func() { rl.mtx.Lock() defer rl.mtx.Unlock() - rl.handling-- + ent.rq.handling-- // unblock the next waiting request - rl.runqueue() + ent.rq.runqueue() }() rl.Handler.ServeHTTP(resp, req) }