From 53c44ca2f2e9e2df9d5eee96e7806fbb7576eadd Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 7 Jun 2023 12:37:25 -0400 Subject: [PATCH] 20602: Add priority queue to RequestLimiter. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 4 +- lib/service/cmd.go | 5 +- sdk/go/httpserver/request_limiter.go | 277 ++++++++++++++++++---- sdk/go/httpserver/request_limiter_test.go | 80 ++++++- 4 files changed, 307 insertions(+), 59 deletions(-) diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index a52d09f68b..837ce896ea 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -223,8 +223,8 @@ Clusters: # parameter higher than this value, this value is used instead. MaxItemsPerResponse: 1000 - # Maximum number of concurrent requests to accept in a single - # service process, or 0 for no limit. + # Maximum number of concurrent requests to process concurrently + # in a single service process, or 0 for no limit. MaxConcurrentRequests: 64 # Fraction of MaxConcurrentRequests that can be "log create" diff --git a/lib/service/cmd.go b/lib/service/cmd.go index cc6938cbc6..119d66191f 100644 --- a/lib/service/cmd.go +++ b/lib/service/cmd.go @@ -154,7 +154,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout httpserver.Inspect(reg, cluster.ManagementToken, httpserver.LogRequests( interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth, - httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))) + &httpserver.RequestLimiter{ + Handler: handler, + MaxConcurrent: cluster.API.MaxConcurrentRequests, + Registry: reg})))))) srv := &httpserver.Server{ Server: http.Server{ Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)), diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go index 8889453125..094f6a6caf 100644 --- a/sdk/go/httpserver/request_limiter.go +++ b/sdk/go/httpserver/request_limiter.go @@ -6,87 +6,270 @@ package httpserver import ( "net/http" - "sync/atomic" + "sync" + "time" "github.com/prometheus/client_golang/prometheus" ) -// RequestCounter is an http.Handler that tracks the number of -// requests in progress. -type RequestCounter interface { - http.Handler +// RequestLimiter wraps http.Handler, limiting the number of +// concurrent requests being handled by the wrapped Handler. Requests +// that arrive when the handler is already at the specified +// concurrency limit are queued and handled in the order indicated by +// the Priority function. +// +// Caller must not modify any RequestLimiter fields after calling its +// methods. +type RequestLimiter struct { + Handler http.Handler + + // Maximum number of requests being handled at once. Beyond + // this limit, requests will be queued. + MaxConcurrent int + + // Maximum number of requests in the queue. Beyond this limit, + // the lowest priority requests will return 503. + MaxQueue int - // Current() returns the number of requests in progress. - Current() int + // Priority determines queue ordering. Requests with higher + // priority are handled first. Requests with equal priority + // are handled FIFO. If Priority is nil, all requests are + // handled FIFO. + Priority func(req *http.Request, queued time.Time) int64 - // Max() returns the maximum number of concurrent requests - // that will be accepted. - Max() int + // "concurrent_requests", "max_concurrent_requests", + // "queued_requests", and "max_queued_requests" metrics are + // registered with Registry, if it is not nil. + Registry *prometheus.Registry + + setupOnce sync.Once + mtx sync.Mutex + handling int + queue heap } -type limiterHandler struct { - requests chan struct{} - handler http.Handler - count int64 // only used if cap(requests)==0 +type qent struct { + queued time.Time + priority int64 + heappos int + ready chan bool // true = handle now; false = return 503 now } -// NewRequestLimiter returns a RequestCounter that delegates up to -// maxRequests at a time to the given handler, and responds 503 to all -// incoming requests beyond that limit. -// -// "concurrent_requests" and "max_concurrent_requests" metrics are -// registered with the given reg, if reg is not nil. -func NewRequestLimiter(maxRequests int, handler http.Handler, reg *prometheus.Registry) RequestCounter { - h := &limiterHandler{ - requests: make(chan struct{}, maxRequests), - handler: handler, +type heap []*qent + +func (h heap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].heappos, h[j].heappos = i, j +} + +func (h heap) Less(i, j int) bool { + pi, pj := h[i].priority, h[j].priority + return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued)) +} + +func (h heap) Len() int { + return len(h) +} + +// Move element i to a correct position in the heap. When the heap is +// empty, fix(0) is a no-op (does not crash). +func (h heap) fix(i int) { + // If the initial position is a leaf (i.e., index is greater + // than the last node's parent index), we only need to move it + // up, not down. + uponly := i > (len(h)-2)/2 + // Move the new entry toward the root until reaching a + // position where the parent already has higher priority. + for i > 0 { + parent := (i - 1) / 2 + if h.Less(i, parent) { + h.Swap(i, parent) + i = parent + } else { + break + } } - if reg != nil { - reg.MustRegister(prometheus.NewGaugeFunc( + // Move i away from the root until reaching a position where + // both children already have lower priority. + for !uponly { + child := i*2 + 1 + if child+1 < len(h) && h.Less(child+1, child) { + // Right child has higher priority than left + // child. Choose right child. + child = child + 1 + } + if child < len(h) && h.Less(child, i) { + // Chosen child has higher priority than i. + // Swap and continue down. + h.Swap(i, child) + i = child + } else { + break + } + } +} + +func (h *heap) add(ent *qent) { + ent.heappos = len(*h) + *h = append(*h, ent) + h.fix(ent.heappos) +} + +func (h *heap) removeMax() *qent { + ent := (*h)[0] + if len(*h) == 1 { + *h = (*h)[:0] + } else { + h.Swap(0, len(*h)-1) + *h = (*h)[:len(*h)-1] + h.fix(0) + } + ent.heappos = -1 + return ent +} + +func (h *heap) remove(i int) { + // Move the last leaf into i's place, then move it to a + // correct position. + h.Swap(i, len(*h)-1) + *h = (*h)[:len(*h)-1] + if i < len(*h) { + h.fix(i) + } +} + +func (rl *RequestLimiter) setup() { + if rl.Registry != nil { + rl.Registry.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "arvados", Name: "concurrent_requests", Help: "Number of requests in progress", }, - func() float64 { return float64(h.Current()) }, + func() float64 { + rl.mtx.Lock() + defer rl.mtx.Unlock() + return float64(rl.handling) + }, )) - reg.MustRegister(prometheus.NewGaugeFunc( + rl.Registry.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "arvados", Name: "max_concurrent_requests", Help: "Maximum number of concurrent requests", }, - func() float64 { return float64(h.Max()) }, + func() float64 { return float64(rl.MaxConcurrent) }, + )) + rl.Registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "queued_requests", + Help: "Number of requests in queue", + }, + func() float64 { + rl.mtx.Lock() + defer rl.mtx.Unlock() + return float64(len(rl.queue)) + }, )) + rl.Registry.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_queued_requests", + Help: "Maximum number of queued requests", + }, + func() float64 { return float64(rl.MaxQueue) }, + )) + } +} + +// caller must have lock +func (rl *RequestLimiter) runqueue() { + // Handle entries from the queue as capacity permits + for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) { + rl.handling++ + ent := rl.queue.removeMax() + ent.heappos = -1 + ent.ready <- true } - return h } -func (h *limiterHandler) Current() int { - if cap(h.requests) == 0 { - return int(atomic.LoadInt64(&h.count)) +// If the queue is too full, fail and remove the lowest-priority +// entry. Caller must have lock. Queue must not be empty. +func (rl *RequestLimiter) trimqueue() { + if len(rl.queue) <= rl.MaxQueue { + return } - return len(h.requests) + min := 0 + for i := range rl.queue { + if i == 0 || rl.queue.Less(min, i) { + min = i + } + } + rl.queue[min].heappos = -1 + rl.queue[min].ready <- false + rl.queue.remove(min) } -func (h *limiterHandler) Max() int { - return cap(h.requests) +func (rl *RequestLimiter) enqueue(req *http.Request) *qent { + rl.mtx.Lock() + defer rl.mtx.Unlock() + qtime := time.Now() + var priority int64 + if rl.Priority != nil { + priority = rl.Priority(req, qtime) + } + ent := &qent{ + queued: qtime, + priority: priority, + ready: make(chan bool, 1), + heappos: -1, + } + if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling { + // fast path, skip the queue + rl.handling++ + ent.ready <- true + return ent + } + rl.queue.add(ent) + rl.trimqueue() + return ent } -func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - if cap(h.requests) == 0 { - atomic.AddInt64(&h.count, 1) - defer atomic.AddInt64(&h.count, -1) - h.handler.ServeHTTP(resp, req) - return +func (rl *RequestLimiter) remove(ent *qent) { + rl.mtx.Lock() + defer rl.mtx.Unlock() + if ent.heappos >= 0 { + rl.queue.remove(ent.heappos) + ent.heappos = -1 + ent.ready <- false } +} + +func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + rl.setupOnce.Do(rl.setup) + ent := rl.enqueue(req) + var ok bool select { - case h.requests <- struct{}{}: - default: - // reached max requests + case <-req.Context().Done(): + rl.remove(ent) + // we still need to wait for ent.ready, because + // sometimes runqueue() will have already decided to + // send true before our rl.remove() call, and in that + // case we'll need to decrement rl.handling below. + ok = <-ent.ready + case ok = <-ent.ready: + } + if !ok { resp.WriteHeader(http.StatusServiceUnavailable) return } - h.handler.ServeHTTP(resp, req) - <-h.requests + defer func() { + rl.mtx.Lock() + defer rl.mtx.Unlock() + rl.handling-- + // unblock the next waiting request + rl.runqueue() + }() + rl.Handler.ServeHTTP(resp, req) } diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go index 9258fbfa58..4023a76bc4 100644 --- a/sdk/go/httpserver/request_limiter_test.go +++ b/sdk/go/httpserver/request_limiter_test.go @@ -5,11 +5,14 @@ package httpserver import ( + "fmt" "net/http" "net/http/httptest" + "strconv" "sync" - "testing" "time" + + check "gopkg.in/check.v1" ) type testHandler struct { @@ -29,9 +32,9 @@ func newTestHandler() *testHandler { } } -func TestRequestLimiter1(t *testing.T) { +func (s *Suite) TestRequestLimiter1(c *check.C) { h := newTestHandler() - l := NewRequestLimiter(1, h, nil) + l := RequestLimiter{MaxConcurrent: 1, Handler: h} var wg sync.WaitGroup resps := make([]*httptest.ResponseRecorder, 10) for i := 0; i < 10; i++ { @@ -59,7 +62,7 @@ func TestRequestLimiter1(t *testing.T) { select { case <-done: case <-time.After(10 * time.Second): - t.Fatal("test timed out, probably deadlocked") + c.Fatal("test timed out, probably deadlocked") } n200 := 0 n503 := 0 @@ -70,11 +73,11 @@ func TestRequestLimiter1(t *testing.T) { case 503: n503++ default: - t.Fatalf("Unexpected response code %d", resps[i].Code) + c.Fatalf("Unexpected response code %d", resps[i].Code) } } if n200 != 1 || n503 != 9 { - t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503) + c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503) } // Now that all 10 are finished, an 11th request should // succeed. @@ -85,13 +88,13 @@ func TestRequestLimiter1(t *testing.T) { resp := httptest.NewRecorder() l.ServeHTTP(resp, &http.Request{}) if resp.Code != 200 { - t.Errorf("Got status %d on 11th request, want 200", resp.Code) + c.Errorf("Got status %d on 11th request, want 200", resp.Code) } } -func TestRequestLimiter10(t *testing.T) { +func (*Suite) TestRequestLimiter10(c *check.C) { h := newTestHandler() - l := NewRequestLimiter(10, h, nil) + l := RequestLimiter{MaxConcurrent: 10, Handler: h} var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) @@ -108,3 +111,62 @@ func TestRequestLimiter10(t *testing.T) { } wg.Wait() } + +func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { + h := newTestHandler() + rl := RequestLimiter{ + MaxConcurrent: 100, + MaxQueue: 20, + Handler: h, + Priority: func(r *http.Request, _ time.Time) int64 { + p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64) + return p + }} + + c.Logf("starting initial requests") + for i := 0; i < rl.MaxConcurrent; i++ { + go func() { + rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}}) + }() + } + c.Logf("waiting for initial requests to consume all MaxConcurrent slots") + for i := 0; i < rl.MaxConcurrent; i++ { + <-h.inHandler + } + + c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue) + var wg1, wg2 sync.WaitGroup + wg1.Add(rl.MaxQueue) + wg2.Add(rl.MaxQueue) + for i := 0; i < rl.MaxQueue*2; i++ { + i := i + go func() { + pri := (i & 1) + 1 + resp := httptest.NewRecorder() + rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}}) + if pri == 1 { + c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) + wg1.Done() + } else { + c.Check(resp.Code, check.Equals, http.StatusOK) + wg2.Done() + } + }() + } + + c.Logf("waiting for queued priority=1 requests to fail") + wg1.Wait() + + c.Logf("allowing initial requests to proceed") + for i := 0; i < rl.MaxConcurrent; i++ { + h.okToProceed <- struct{}{} + } + + c.Logf("allowing queued priority=2 requests to proceed") + for i := 0; i < rl.MaxQueue; i++ { + <-h.inHandler + h.okToProceed <- struct{}{} + } + c.Logf("waiting for queued priority=2 requests to succeed") + wg2.Wait() +} -- 2.30.2