"github.com/sirupsen/logrus"
)
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
// Prometheus typically polls every 10 seconds, but it doesn't cost us
// much to also accommodate higher frequency collection by updating
// handled FIFO.
Priority func(req *http.Request, queued time.Time) int64
+ // Return 503 for any request for which Priority() returns
+ // MinPriority if it spends longer than this in the queue
+ // before starting processing.
+ MaxQueueTimeForMinPriority time.Duration
+
// "concurrent_requests", "max_concurrent_requests",
// "queued_requests", and "max_queued_requests" metrics are
// registered with Registry, if it is not nil.
ent.ready <- true
return ent
}
- if priority == IneligibleForQueuePriority {
- // Priority func is telling us to return 503
- // immediately instead of queueing, regardless of
- // queue size, if we can't handle the request
- // immediately.
- ent.ready <- false
- return ent
- }
rl.queue.add(ent)
rl.trimqueue()
return ent
rl.setupOnce.Do(rl.setup)
ent := rl.enqueue(req)
SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+ if ent.priority == MinPriority {
+ // Note that MaxQueueTime==0 does not cancel a req
+ // that skips the queue, because in that case
+ // rl.enqueue() has already fired ready<-true and
+ // rl.remove() is a no-op.
+ go func() {
+ time.Sleep(rl.MaxQueueTimeForMinPriority)
+ rl.remove(ent)
+ }()
+ }
var ok bool
select {
case <-req.Context().Done():