20602: Allow lock requests to queue up to 2s (configurable).
[arvados.git] / sdk / go / httpserver / request_limiter.go
index 33a2eb61464f091736ea14cd5a87f66a62aa4516..9d501ab0ebfa7db908a2886d4b208973c8606863 100644 (file)
@@ -15,7 +15,7 @@ import (
        "github.com/sirupsen/logrus"
 )
 
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
 
 // Prometheus typically polls every 10 seconds, but it doesn't cost us
 // much to also accommodate higher frequency collection by updating
@@ -48,6 +48,11 @@ type RequestLimiter struct {
        // handled FIFO.
        Priority func(req *http.Request, queued time.Time) int64
 
+       // Return 503 for any request for which Priority() returns
+       // MinPriority if it spends longer than this in the queue
+       // before starting processing.
+       MaxQueueTimeForMinPriority time.Duration
+
        // "concurrent_requests", "max_concurrent_requests",
        // "queued_requests", and "max_queued_requests" metrics are
        // registered with Registry, if it is not nil.
@@ -233,14 +238,6 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
                ent.ready <- true
                return ent
        }
-       if priority == IneligibleForQueuePriority {
-               // Priority func is telling us to return 503
-               // immediately instead of queueing, regardless of
-               // queue size, if we can't handle the request
-               // immediately.
-               ent.ready <- false
-               return ent
-       }
        rl.queue.add(ent)
        rl.trimqueue()
        return ent
@@ -259,6 +256,16 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
        rl.setupOnce.Do(rl.setup)
        ent := rl.enqueue(req)
        SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+       if ent.priority == MinPriority {
+               // Note that MaxQueueTime==0 does not cancel a req
+               // that skips the queue, because in that case
+               // rl.enqueue() has already fired ready<-true and
+               // rl.remove() is a no-op.
+               go func() {
+                       time.Sleep(rl.MaxQueueTimeForMinPriority)
+                       rl.remove(ent)
+               }()
+       }
        var ok bool
        select {
        case <-req.Context().Done():