20602: Add priority queue to RequestLimiter.
authorTom Clegg <tom@curii.com>
Wed, 7 Jun 2023 16:37:25 +0000 (12:37 -0400)
committerTom Clegg <tom@curii.com>
Thu, 8 Jun 2023 18:31:39 +0000 (14:31 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/service/cmd.go
sdk/go/httpserver/request_limiter.go
sdk/go/httpserver/request_limiter_test.go

index a52d09f68b1e268e399ece1fb077e79f37db9653..837ce896ea8fc78695fe21f23d543f15aa3722de 100644 (file)
@@ -223,8 +223,8 @@ Clusters:
       # parameter higher than this value, this value is used instead.
       MaxItemsPerResponse: 1000
 
-      # Maximum number of concurrent requests to accept in a single
-      # service process, or 0 for no limit.
+      # Maximum number of concurrent requests to process concurrently
+      # in a single service process, or 0 for no limit.
       MaxConcurrentRequests: 64
 
       # Fraction of MaxConcurrentRequests that can be "log create"
index cc6938cbc6e1ff3f680a4ca024fcc698e946d713..119d66191f889743de2f2407b423ca5f22663c71 100644 (file)
@@ -154,7 +154,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                                httpserver.Inspect(reg, cluster.ManagementToken,
                                        httpserver.LogRequests(
                                                interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-                                                       httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
+                                                       &httpserver.RequestLimiter{
+                                                               Handler:       handler,
+                                                               MaxConcurrent: cluster.API.MaxConcurrentRequests,
+                                                               Registry:      reg}))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
index 8889453125c4753a62927f830c4e236ecbc272a6..094f6a6caf2255d62c0f5b5af7e6cb6f5940e0a9 100644 (file)
@@ -6,87 +6,270 @@ package httpserver
 
 import (
        "net/http"
-       "sync/atomic"
+       "sync"
+       "time"
 
        "github.com/prometheus/client_golang/prometheus"
 )
 
-// RequestCounter is an http.Handler that tracks the number of
-// requests in progress.
-type RequestCounter interface {
-       http.Handler
+// RequestLimiter wraps http.Handler, limiting the number of
+// concurrent requests being handled by the wrapped Handler. Requests
+// that arrive when the handler is already at the specified
+// concurrency limit are queued and handled in the order indicated by
+// the Priority function.
+//
+// Caller must not modify any RequestLimiter fields after calling its
+// methods.
+type RequestLimiter struct {
+       Handler http.Handler
+
+       // Maximum number of requests being handled at once. Beyond
+       // this limit, requests will be queued.
+       MaxConcurrent int
+
+       // Maximum number of requests in the queue. Beyond this limit,
+       // the lowest priority requests will return 503.
+       MaxQueue int
 
-       // Current() returns the number of requests in progress.
-       Current() int
+       // Priority determines queue ordering. Requests with higher
+       // priority are handled first. Requests with equal priority
+       // are handled FIFO. If Priority is nil, all requests are
+       // handled FIFO.
+       Priority func(req *http.Request, queued time.Time) int64
 
-       // Max() returns the maximum number of concurrent requests
-       // that will be accepted.
-       Max() int
+       // "concurrent_requests", "max_concurrent_requests",
+       // "queued_requests", and "max_queued_requests" metrics are
+       // registered with Registry, if it is not nil.
+       Registry *prometheus.Registry
+
+       setupOnce sync.Once
+       mtx       sync.Mutex
+       handling  int
+       queue     heap
 }
 
-type limiterHandler struct {
-       requests chan struct{}
-       handler  http.Handler
-       count    int64 // only used if cap(requests)==0
+type qent struct {
+       queued   time.Time
+       priority int64
+       heappos  int
+       ready    chan bool // true = handle now; false = return 503 now
 }
 
-// NewRequestLimiter returns a RequestCounter that delegates up to
-// maxRequests at a time to the given handler, and responds 503 to all
-// incoming requests beyond that limit.
-//
-// "concurrent_requests" and "max_concurrent_requests" metrics are
-// registered with the given reg, if reg is not nil.
-func NewRequestLimiter(maxRequests int, handler http.Handler, reg *prometheus.Registry) RequestCounter {
-       h := &limiterHandler{
-               requests: make(chan struct{}, maxRequests),
-               handler:  handler,
+type heap []*qent
+
+func (h heap) Swap(i, j int) {
+       h[i], h[j] = h[j], h[i]
+       h[i].heappos, h[j].heappos = i, j
+}
+
+func (h heap) Less(i, j int) bool {
+       pi, pj := h[i].priority, h[j].priority
+       return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
+}
+
+func (h heap) Len() int {
+       return len(h)
+}
+
+// Move element i to a correct position in the heap. When the heap is
+// empty, fix(0) is a no-op (does not crash).
+func (h heap) fix(i int) {
+       // If the initial position is a leaf (i.e., index is greater
+       // than the last node's parent index), we only need to move it
+       // up, not down.
+       uponly := i > (len(h)-2)/2
+       // Move the new entry toward the root until reaching a
+       // position where the parent already has higher priority.
+       for i > 0 {
+               parent := (i - 1) / 2
+               if h.Less(i, parent) {
+                       h.Swap(i, parent)
+                       i = parent
+               } else {
+                       break
+               }
        }
-       if reg != nil {
-               reg.MustRegister(prometheus.NewGaugeFunc(
+       // Move i away from the root until reaching a position where
+       // both children already have lower priority.
+       for !uponly {
+               child := i*2 + 1
+               if child+1 < len(h) && h.Less(child+1, child) {
+                       // Right child has higher priority than left
+                       // child. Choose right child.
+                       child = child + 1
+               }
+               if child < len(h) && h.Less(child, i) {
+                       // Chosen child has higher priority than i.
+                       // Swap and continue down.
+                       h.Swap(i, child)
+                       i = child
+               } else {
+                       break
+               }
+       }
+}
+
+func (h *heap) add(ent *qent) {
+       ent.heappos = len(*h)
+       *h = append(*h, ent)
+       h.fix(ent.heappos)
+}
+
+func (h *heap) removeMax() *qent {
+       ent := (*h)[0]
+       if len(*h) == 1 {
+               *h = (*h)[:0]
+       } else {
+               h.Swap(0, len(*h)-1)
+               *h = (*h)[:len(*h)-1]
+               h.fix(0)
+       }
+       ent.heappos = -1
+       return ent
+}
+
+func (h *heap) remove(i int) {
+       // Move the last leaf into i's place, then move it to a
+       // correct position.
+       h.Swap(i, len(*h)-1)
+       *h = (*h)[:len(*h)-1]
+       if i < len(*h) {
+               h.fix(i)
+       }
+}
+
+func (rl *RequestLimiter) setup() {
+       if rl.Registry != nil {
+               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
                        prometheus.GaugeOpts{
                                Namespace: "arvados",
                                Name:      "concurrent_requests",
                                Help:      "Number of requests in progress",
                        },
-                       func() float64 { return float64(h.Current()) },
+                       func() float64 {
+                               rl.mtx.Lock()
+                               defer rl.mtx.Unlock()
+                               return float64(rl.handling)
+                       },
                ))
-               reg.MustRegister(prometheus.NewGaugeFunc(
+               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
                        prometheus.GaugeOpts{
                                Namespace: "arvados",
                                Name:      "max_concurrent_requests",
                                Help:      "Maximum number of concurrent requests",
                        },
-                       func() float64 { return float64(h.Max()) },
+                       func() float64 { return float64(rl.MaxConcurrent) },
+               ))
+               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "queued_requests",
+                               Help:      "Number of requests in queue",
+                       },
+                       func() float64 {
+                               rl.mtx.Lock()
+                               defer rl.mtx.Unlock()
+                               return float64(len(rl.queue))
+                       },
                ))
+               rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_queued_requests",
+                               Help:      "Maximum number of queued requests",
+                       },
+                       func() float64 { return float64(rl.MaxQueue) },
+               ))
+       }
+}
+
+// caller must have lock
+func (rl *RequestLimiter) runqueue() {
+       // Handle entries from the queue as capacity permits
+       for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
+               rl.handling++
+               ent := rl.queue.removeMax()
+               ent.heappos = -1
+               ent.ready <- true
        }
-       return h
 }
 
-func (h *limiterHandler) Current() int {
-       if cap(h.requests) == 0 {
-               return int(atomic.LoadInt64(&h.count))
+// If the queue is too full, fail and remove the lowest-priority
+// entry. Caller must have lock. Queue must not be empty.
+func (rl *RequestLimiter) trimqueue() {
+       if len(rl.queue) <= rl.MaxQueue {
+               return
        }
-       return len(h.requests)
+       min := 0
+       for i := range rl.queue {
+               if i == 0 || rl.queue.Less(min, i) {
+                       min = i
+               }
+       }
+       rl.queue[min].heappos = -1
+       rl.queue[min].ready <- false
+       rl.queue.remove(min)
 }
 
-func (h *limiterHandler) Max() int {
-       return cap(h.requests)
+func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
+       rl.mtx.Lock()
+       defer rl.mtx.Unlock()
+       qtime := time.Now()
+       var priority int64
+       if rl.Priority != nil {
+               priority = rl.Priority(req, qtime)
+       }
+       ent := &qent{
+               queued:   qtime,
+               priority: priority,
+               ready:    make(chan bool, 1),
+               heappos:  -1,
+       }
+       if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
+               // fast path, skip the queue
+               rl.handling++
+               ent.ready <- true
+               return ent
+       }
+       rl.queue.add(ent)
+       rl.trimqueue()
+       return ent
 }
 
-func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-       if cap(h.requests) == 0 {
-               atomic.AddInt64(&h.count, 1)
-               defer atomic.AddInt64(&h.count, -1)
-               h.handler.ServeHTTP(resp, req)
-               return
+func (rl *RequestLimiter) remove(ent *qent) {
+       rl.mtx.Lock()
+       defer rl.mtx.Unlock()
+       if ent.heappos >= 0 {
+               rl.queue.remove(ent.heappos)
+               ent.heappos = -1
+               ent.ready <- false
        }
+}
+
+func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       rl.setupOnce.Do(rl.setup)
+       ent := rl.enqueue(req)
+       var ok bool
        select {
-       case h.requests <- struct{}{}:
-       default:
-               // reached max requests
+       case <-req.Context().Done():
+               rl.remove(ent)
+               // we still need to wait for ent.ready, because
+               // sometimes runqueue() will have already decided to
+               // send true before our rl.remove() call, and in that
+               // case we'll need to decrement rl.handling below.
+               ok = <-ent.ready
+       case ok = <-ent.ready:
+       }
+       if !ok {
                resp.WriteHeader(http.StatusServiceUnavailable)
                return
        }
-       h.handler.ServeHTTP(resp, req)
-       <-h.requests
+       defer func() {
+               rl.mtx.Lock()
+               defer rl.mtx.Unlock()
+               rl.handling--
+               // unblock the next waiting request
+               rl.runqueue()
+       }()
+       rl.Handler.ServeHTTP(resp, req)
 }
index 9258fbfa58f4b5a4867651fc15aba4e9b9616dcf..4023a76bc442493097c61328853b75a03311b635 100644 (file)
@@ -5,11 +5,14 @@
 package httpserver
 
 import (
+       "fmt"
        "net/http"
        "net/http/httptest"
+       "strconv"
        "sync"
-       "testing"
        "time"
+
+       check "gopkg.in/check.v1"
 )
 
 type testHandler struct {
@@ -29,9 +32,9 @@ func newTestHandler() *testHandler {
        }
 }
 
-func TestRequestLimiter1(t *testing.T) {
+func (s *Suite) TestRequestLimiter1(c *check.C) {
        h := newTestHandler()
-       l := NewRequestLimiter(1, h, nil)
+       l := RequestLimiter{MaxConcurrent: 1, Handler: h}
        var wg sync.WaitGroup
        resps := make([]*httptest.ResponseRecorder, 10)
        for i := 0; i < 10; i++ {
@@ -59,7 +62,7 @@ func TestRequestLimiter1(t *testing.T) {
        select {
        case <-done:
        case <-time.After(10 * time.Second):
-               t.Fatal("test timed out, probably deadlocked")
+               c.Fatal("test timed out, probably deadlocked")
        }
        n200 := 0
        n503 := 0
@@ -70,11 +73,11 @@ func TestRequestLimiter1(t *testing.T) {
                case 503:
                        n503++
                default:
-                       t.Fatalf("Unexpected response code %d", resps[i].Code)
+                       c.Fatalf("Unexpected response code %d", resps[i].Code)
                }
        }
        if n200 != 1 || n503 != 9 {
-               t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+               c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
        }
        // Now that all 10 are finished, an 11th request should
        // succeed.
@@ -85,13 +88,13 @@ func TestRequestLimiter1(t *testing.T) {
        resp := httptest.NewRecorder()
        l.ServeHTTP(resp, &http.Request{})
        if resp.Code != 200 {
-               t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+               c.Errorf("Got status %d on 11th request, want 200", resp.Code)
        }
 }
 
-func TestRequestLimiter10(t *testing.T) {
+func (*Suite) TestRequestLimiter10(c *check.C) {
        h := newTestHandler()
-       l := NewRequestLimiter(10, h, nil)
+       l := RequestLimiter{MaxConcurrent: 10, Handler: h}
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {
                wg.Add(1)
@@ -108,3 +111,62 @@ func TestRequestLimiter10(t *testing.T) {
        }
        wg.Wait()
 }
+
+func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
+       h := newTestHandler()
+       rl := RequestLimiter{
+               MaxConcurrent: 100,
+               MaxQueue:      20,
+               Handler:       h,
+               Priority: func(r *http.Request, _ time.Time) int64 {
+                       p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
+                       return p
+               }}
+
+       c.Logf("starting initial requests")
+       for i := 0; i < rl.MaxConcurrent; i++ {
+               go func() {
+                       rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
+               }()
+       }
+       c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
+       for i := 0; i < rl.MaxConcurrent; i++ {
+               <-h.inHandler
+       }
+
+       c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
+       var wg1, wg2 sync.WaitGroup
+       wg1.Add(rl.MaxQueue)
+       wg2.Add(rl.MaxQueue)
+       for i := 0; i < rl.MaxQueue*2; i++ {
+               i := i
+               go func() {
+                       pri := (i & 1) + 1
+                       resp := httptest.NewRecorder()
+                       rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
+                       if pri == 1 {
+                               c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+                               wg1.Done()
+                       } else {
+                               c.Check(resp.Code, check.Equals, http.StatusOK)
+                               wg2.Done()
+                       }
+               }()
+       }
+
+       c.Logf("waiting for queued priority=1 requests to fail")
+       wg1.Wait()
+
+       c.Logf("allowing initial requests to proceed")
+       for i := 0; i < rl.MaxConcurrent; i++ {
+               h.okToProceed <- struct{}{}
+       }
+
+       c.Logf("allowing queued priority=2 requests to proceed")
+       for i := 0; i < rl.MaxQueue; i++ {
+               <-h.inHandler
+               h.okToProceed <- struct{}{}
+       }
+       c.Logf("waiting for queued priority=2 requests to succeed")
+       wg2.Wait()
+}