# additional requests while at the MaxConcurrentRequests limit.
MaxQueuedRequests: 64
+ # Maximum time a "lock container" request is allowed to wait in
+ # the incoming request queue before returning 503.
+ MaxQueueTimeForLockRequests: 2s
+
# Fraction of MaxConcurrentRequests that can be "log create"
# messages at any given time. This is to prevent logging
# updates from crowding out more important requests.
"API.MaxItemsPerResponse": true,
"API.MaxKeepBlobBuffers": false,
"API.MaxQueuedRequests": false,
+ "API.MaxQueueTimeForLockRequests": false,
"API.MaxRequestAmplification": false,
"API.MaxRequestSize": true,
"API.MaxTokenLifetime": false,
httpserver.LogRequests(
interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
&httpserver.RequestLimiter{
- Handler: handler,
- MaxConcurrent: cluster.API.MaxConcurrentRequests,
- MaxQueue: cluster.API.MaxQueuedRequests,
- Priority: c.requestPriority,
- Registry: reg}))))))
+ Handler: handler,
+ MaxConcurrent: cluster.API.MaxConcurrentRequests,
+ MaxQueue: cluster.API.MaxQueuedRequests,
+ MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+ Priority: c.requestPriority,
+ Registry: reg}))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
// Return 503 immediately instead of queueing. We want
// to send feedback to dispatchcloud ASAP to stop
// bringing up new containers.
- return httpserver.IneligibleForQueuePriority
+ return httpserver.MinPriority
case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
// "Create log entry" is the most harmless kind of
// request to drop. Negative priority is called "low"
MaxItemsPerResponse int
MaxConcurrentRequests int
MaxQueuedRequests int
+ MaxQueueTimeForLockRequests Duration
LogCreateRequestFraction float64
MaxKeepBlobBuffers int
MaxRequestAmplification int
"github.com/sirupsen/logrus"
)
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
// Prometheus typically polls every 10 seconds, but it doesn't cost us
// much to also accommodate higher frequency collection by updating
// handled FIFO.
Priority func(req *http.Request, queued time.Time) int64
+ // Return 503 for any request for which Priority() returns
+ // MinPriority if it spends longer than this in the queue
+ // before starting processing.
+ MaxQueueTimeForMinPriority time.Duration
+
// "concurrent_requests", "max_concurrent_requests",
// "queued_requests", and "max_queued_requests" metrics are
// registered with Registry, if it is not nil.
ent.ready <- true
return ent
}
- if priority == IneligibleForQueuePriority {
- // Priority func is telling us to return 503
- // immediately instead of queueing, regardless of
- // queue size, if we can't handle the request
- // immediately.
- ent.ready <- false
- return ent
- }
rl.queue.add(ent)
rl.trimqueue()
return ent
rl.setupOnce.Do(rl.setup)
ent := rl.enqueue(req)
SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+ if ent.priority == MinPriority {
+ // Note that MaxQueueTime==0 does not cancel a req
+ // that skips the queue, because in that case
+ // rl.enqueue() has already fired ready<-true and
+ // rl.remove() is a no-op.
+ go func() {
+ time.Sleep(rl.MaxQueueTimeForMinPriority)
+ rl.remove(ent)
+ }()
+ }
var ok bool
select {
case <-req.Context().Done():
<-h.inHandler
}
- c.Logf("starting %d priority=IneligibleForQueuePriority requests (should respond 503 immediately)", rl.MaxQueue)
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue)
var wgX sync.WaitGroup
for i := 0; i < rl.MaxQueue; i++ {
wgX.Add(1)
go func() {
defer wgX.Done()
resp := httptest.NewRecorder()
- rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", IneligibleForQueuePriority)}}})
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
}()
}
wgX.Wait()
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue)
+ // Usage docs say the caller isn't allowed to change fields
+ // after first use, but we secretly know it's OK to change
+ // this field on the fly as long as no requests are arriving
+ // concurrently.
+ rl.MaxQueueTimeForMinPriority = time.Millisecond * 100
+ for i := 0; i < rl.MaxQueue; i++ {
+ wgX.Add(1)
+ go func() {
+ defer wgX.Done()
+ resp := httptest.NewRecorder()
+ t0 := time.Now()
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
+ c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+ elapsed := time.Since(t0)
+ c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true)
+ c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true)
+ }()
+ }
+ wgX.Wait()
+
c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
var wg1, wg2 sync.WaitGroup
wg1.Add(rl.MaxQueue)