20602: Allow lock requests to queue up to 2s (configurable). 20602-queue-metrics
authorTom Clegg <tom@curii.com>
Mon, 19 Jun 2023 20:41:18 +0000 (16:41 -0400)
committerTom Clegg <tom@curii.com>
Mon, 19 Jun 2023 20:41:18 +0000 (16:41 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/config/export.go
lib/service/cmd.go
sdk/go/arvados/config.go
sdk/go/httpserver/request_limiter.go
sdk/go/httpserver/request_limiter_test.go

index 33c1e497de3fc8bbdb50b4d8cc7386eda1fa3937..0fb4a2babdf8057ed8291f7d280887f6d5a99617 100644 (file)
@@ -236,6 +236,10 @@ Clusters:
       # additional requests while at the MaxConcurrentRequests limit.
       MaxQueuedRequests: 64
 
+      # Maximum time a "lock container" request is allowed to wait in
+      # the incoming request queue before returning 503.
+      MaxQueueTimeForLockRequests: 2s
+
       # Fraction of MaxConcurrentRequests that can be "log create"
       # messages at any given time.  This is to prevent logging
       # updates from crowding out more important requests.
index 565be2fb7614f9bfcd8f4b12cb8d16206fca809e..f46f5b6f843542309df4f93b41208853dd285eee 100644 (file)
@@ -73,6 +73,7 @@ var whitelist = map[string]bool{
        "API.MaxItemsPerResponse":                  true,
        "API.MaxKeepBlobBuffers":                   false,
        "API.MaxQueuedRequests":                    false,
+       "API.MaxQueueTimeForLockRequests":          false,
        "API.MaxRequestAmplification":              false,
        "API.MaxRequestSize":                       true,
        "API.MaxTokenLifetime":                     false,
index f4d4ec8efe4b8878d97fafdfc70ba6e364f92689..854b94861f1362f9e58592f771400bed28b9afaa 100644 (file)
@@ -155,11 +155,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                                        httpserver.LogRequests(
                                                interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
                                                        &httpserver.RequestLimiter{
-                                                               Handler:       handler,
-                                                               MaxConcurrent: cluster.API.MaxConcurrentRequests,
-                                                               MaxQueue:      cluster.API.MaxQueuedRequests,
-                                                               Priority:      c.requestPriority,
-                                                               Registry:      reg}))))))
+                                                               Handler:                    handler,
+                                                               MaxConcurrent:              cluster.API.MaxConcurrentRequests,
+                                                               MaxQueue:                   cluster.API.MaxQueuedRequests,
+                                                               MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+                                                               Priority:                   c.requestPriority,
+                                                               Registry:                   reg}))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -261,7 +262,7 @@ func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
                // Return 503 immediately instead of queueing. We want
                // to send feedback to dispatchcloud ASAP to stop
                // bringing up new containers.
-               return httpserver.IneligibleForQueuePriority
+               return httpserver.MinPriority
        case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
                // "Create log entry" is the most harmless kind of
                // request to drop. Negative priority is called "low"
index 2e9abf2ecb4c36904e36da2dff15a4c8eea9ea2a..0fafa41f90a8cb34c071f72ac692c1797860a9a8 100644 (file)
@@ -101,6 +101,7 @@ type Cluster struct {
                MaxItemsPerResponse              int
                MaxConcurrentRequests            int
                MaxQueuedRequests                int
+               MaxQueueTimeForLockRequests      Duration
                LogCreateRequestFraction         float64
                MaxKeepBlobBuffers               int
                MaxRequestAmplification          int
index 33a2eb61464f091736ea14cd5a87f66a62aa4516..9d501ab0ebfa7db908a2886d4b208973c8606863 100644 (file)
@@ -15,7 +15,7 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
 
 // Prometheus typically polls every 10 seconds, but it doesn't cost us
 // much to also accommodate higher frequency collection by updating
@@ -48,6 +48,11 @@ type RequestLimiter struct {
        // handled FIFO.
        Priority func(req *http.Request, queued time.Time) int64
 
+       // Return 503 for any request for which Priority() returns
+       // MinPriority if it spends longer than this in the queue
+       // before starting processing.
+       MaxQueueTimeForMinPriority time.Duration
+
        // "concurrent_requests", "max_concurrent_requests",
        // "queued_requests", and "max_queued_requests" metrics are
        // registered with Registry, if it is not nil.
@@ -233,14 +238,6 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
                ent.ready <- true
                return ent
        }
-       if priority == IneligibleForQueuePriority {
-               // Priority func is telling us to return 503
-               // immediately instead of queueing, regardless of
-               // queue size, if we can't handle the request
-               // immediately.
-               ent.ready <- false
-               return ent
-       }
        rl.queue.add(ent)
        rl.trimqueue()
        return ent
@@ -259,6 +256,16 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
        rl.setupOnce.Do(rl.setup)
        ent := rl.enqueue(req)
        SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+       if ent.priority == MinPriority {
+               // Note that MaxQueueTime==0 does not cancel a req
+               // that skips the queue, because in that case
+               // rl.enqueue() has already fired ready<-true and
+               // rl.remove() is a no-op.
+               go func() {
+                       time.Sleep(rl.MaxQueueTimeForMinPriority)
+                       rl.remove(ent)
+               }()
+       }
        var ok bool
        select {
        case <-req.Context().Done():
index b04ff57cc176fca18d58773565cee1a55f3a5357..55f13b4625fdf1c637dfba721b4ee8f00af2ecc3 100644 (file)
@@ -134,19 +134,40 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
                <-h.inHandler
        }
 
-       c.Logf("starting %d priority=IneligibleForQueuePriority requests (should respond 503 immediately)", rl.MaxQueue)
+       c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue)
        var wgX sync.WaitGroup
        for i := 0; i < rl.MaxQueue; i++ {
                wgX.Add(1)
                go func() {
                        defer wgX.Done()
                        resp := httptest.NewRecorder()
-                       rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", IneligibleForQueuePriority)}}})
+                       rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
                        c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
                }()
        }
        wgX.Wait()
 
+       c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue)
+       // Usage docs say the caller isn't allowed to change fields
+       // after first use, but we secretly know it's OK to change
+       // this field on the fly as long as no requests are arriving
+       // concurrently.
+       rl.MaxQueueTimeForMinPriority = time.Millisecond * 100
+       for i := 0; i < rl.MaxQueue; i++ {
+               wgX.Add(1)
+               go func() {
+                       defer wgX.Done()
+                       resp := httptest.NewRecorder()
+                       t0 := time.Now()
+                       rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
+                       c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+                       elapsed := time.Since(t0)
+                       c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true)
+                       c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true)
+               }()
+       }
+       wgX.Wait()
+
        c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
        var wg1, wg2 sync.WaitGroup
        wg1.Add(rl.MaxQueue)