"github.com/sirupsen/logrus"
)
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
+
+// Prometheus typically polls every 10 seconds, but it doesn't cost us
+// much to also accommodate higher frequency collection by updating
+// internal stats more frequently. (This limits time resolution only
+// for the metrics that aren't generated on the fly.)
+const metricsUpdateInterval = time.Second
// RequestLimiter wraps http.Handler, limiting the number of
// concurrent requests being handled by the wrapped Handler. Requests
// handled FIFO.
Priority func(req *http.Request, queued time.Time) int64
+ // Return 503 for any request for which Priority() returns
+ // MinPriority if it spends longer than this in the queue
+ // before starting processing.
+ MaxQueueTimeForMinPriority time.Duration
+
// "concurrent_requests", "max_concurrent_requests",
// "queued_requests", and "max_queued_requests" metrics are
// registered with Registry, if it is not nil.
Registry *prometheus.Registry
- setupOnce sync.Once
- mtx sync.Mutex
- handling int
- queue queue
+ setupOnce sync.Once
+ mQueueDelay *prometheus.SummaryVec
+ mQueueTimeout *prometheus.SummaryVec
+ mQueueUsage *prometheus.GaugeVec
+ mtx sync.Mutex
+ handling int
+ queue queue
}
type qent struct {
},
func() float64 { return float64(rl.MaxConcurrent) },
))
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "queued_requests",
- Help: "Number of requests in queue",
- },
- func() float64 {
- rl.mtx.Lock()
- defer rl.mtx.Unlock()
- return float64(len(rl.queue))
- },
- ))
+ rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "queued_requests",
+ Help: "Number of requests in queue",
+ }, []string{"priority"})
+ rl.Registry.MustRegister(rl.mQueueUsage)
rl.Registry.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
},
func() float64 { return float64(rl.MaxQueue) },
))
+ rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Name: "queue_delay_seconds",
+ Help: "Time spent in the incoming request queue before start of processing",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ }, []string{"priority"})
+ rl.Registry.MustRegister(rl.mQueueDelay)
+ rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Name: "queue_timeout_seconds",
+ Help: "Time spent in the incoming request queue before client timed out or disconnected",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ }, []string{"priority"})
+ rl.Registry.MustRegister(rl.mQueueTimeout)
+ go func() {
+ for range time.NewTicker(metricsUpdateInterval).C {
+ var low, normal, high int
+ rl.mtx.Lock()
+ for _, ent := range rl.queue {
+ switch {
+ case ent.priority < 0:
+ low++
+ case ent.priority > 0:
+ high++
+ default:
+ normal++
+ }
+ }
+ rl.mtx.Unlock()
+ rl.mQueueUsage.WithLabelValues("low").Set(float64(low))
+ rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal))
+ rl.mQueueUsage.WithLabelValues("high").Set(float64(high))
+ }
+ }()
}
}
ent.ready <- true
return ent
}
- if priority == IneligibleForQueuePriority {
- // Priority func is telling us to return 503
- // immediately instead of queueing, regardless of
- // queue size, if we can't handle the request
- // immediately.
- ent.ready <- false
- return ent
- }
rl.queue.add(ent)
rl.trimqueue()
return ent
rl.setupOnce.Do(rl.setup)
ent := rl.enqueue(req)
SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+ if ent.priority == MinPriority {
+ // Note that MaxQueueTime==0 does not cancel a req
+ // that skips the queue, because in that case
+ // rl.enqueue() has already fired ready<-true and
+ // rl.remove() is a no-op.
+ go func() {
+ time.Sleep(rl.MaxQueueTimeForMinPriority)
+ rl.remove(ent)
+ }()
+ }
var ok bool
select {
case <-req.Context().Done():
ok = <-ent.ready
case ok = <-ent.ready:
}
+
+ // Report time spent in queue in the appropriate bucket:
+ // mQueueDelay if the request actually got processed,
+ // mQueueTimeout if it was abandoned or cancelled before
+ // getting a processing slot.
+ var series *prometheus.SummaryVec
+ if ok {
+ series = rl.mQueueDelay
+ } else {
+ series = rl.mQueueTimeout
+ }
+ if series != nil {
+ var qlabel string
+ switch {
+ case ent.priority < 0:
+ qlabel = "low"
+ case ent.priority > 0:
+ qlabel = "high"
+ default:
+ qlabel = "normal"
+ }
+ series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+ }
+
if !ok {
resp.WriteHeader(http.StatusServiceUnavailable)
return