+func (h *heap) remove(i int) {
+ // Move the last leaf into i's place, then move it to a
+ // correct position.
+ h.Swap(i, len(*h)-1)
+ *h = (*h)[:len(*h)-1]
+ if i < len(*h) {
+ h.fix(i)
+ }
+}
+
+func (rl *RequestLimiter) setup() {
+ if rl.Registry != nil {
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "concurrent_requests",
+ Help: "Number of requests in progress",
+ },
+ func() float64 {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ return float64(rl.handling)
+ },
+ ))
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_concurrent_requests",
+ Help: "Maximum number of concurrent requests",
+ },
+ func() float64 { return float64(rl.MaxConcurrent) },
+ ))
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "queued_requests",
+ Help: "Number of requests in queue",
+ },
+ func() float64 {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ return float64(len(rl.queue))
+ },
+ ))
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_queued_requests",
+ Help: "Maximum number of queued requests",
+ },
+ func() float64 { return float64(rl.MaxQueue) },
+ ))
+ }
+}
+
+// caller must have lock
+func (rl *RequestLimiter) runqueue() {
+ // Handle entries from the queue as capacity permits
+ for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
+ rl.handling++
+ ent := rl.queue.removeMax()
+ ent.heappos = -1
+ ent.ready <- true
+ }
+}
+
+// If the queue is too full, fail and remove the lowest-priority
+// entry. Caller must have lock. Queue must not be empty.
+func (rl *RequestLimiter) trimqueue() {
+ if len(rl.queue) <= rl.MaxQueue {
+ return
+ }
+ min := 0
+ for i := range rl.queue {
+ if i == 0 || rl.queue.Less(min, i) {
+ min = i
+ }
+ }
+ rl.queue[min].heappos = -1
+ rl.queue[min].ready <- false
+ rl.queue.remove(min)
+}
+
+func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ qtime := time.Now()
+ var priority int64
+ if rl.Priority != nil {
+ priority = rl.Priority(req, qtime)
+ }
+ ent := &qent{
+ queued: qtime,
+ priority: priority,
+ ready: make(chan bool, 1),
+ heappos: -1,
+ }
+ if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
+ // fast path, skip the queue
+ rl.handling++
+ ent.ready <- true
+ return ent
+ }
+ rl.queue.add(ent)
+ rl.trimqueue()
+ return ent
+}
+
+func (rl *RequestLimiter) remove(ent *qent) {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ if ent.heappos >= 0 {
+ rl.queue.remove(ent.heappos)
+ ent.heappos = -1
+ ent.ready <- false
+ }
+}
+
+func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ rl.setupOnce.Do(rl.setup)
+ ent := rl.enqueue(req)
+ var ok bool