import (
"fmt"
- "math"
"net/http"
"net/http/httptest"
"strconv"
func (s *Suite) TestRequestLimiter1(c *check.C) {
h := newTestHandler()
- l := RequestLimiter{MaxConcurrent: 1, Handler: h}
+ rq := &RequestQueue{
+ MaxConcurrent: 1}
+ l := RequestLimiter{
+ Queue: func(*http.Request) *RequestQueue { return rq },
+ Handler: h}
var wg sync.WaitGroup
resps := make([]*httptest.ResponseRecorder, 10)
for i := 0; i < 10; i++ {
func (*Suite) TestRequestLimiter10(c *check.C) {
h := newTestHandler()
- l := RequestLimiter{MaxConcurrent: 10, Handler: h}
+ rq := &RequestQueue{
+ MaxConcurrent: 10}
+ l := RequestLimiter{
+ Queue: func(*http.Request) *RequestQueue { return rq },
+ Handler: h}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
h := newTestHandler()
+ rq := &RequestQueue{
+ MaxConcurrent: 1000,
+ MaxQueue: 200,
+ }
rl := RequestLimiter{
- MaxConcurrent: 100,
- MaxQueue: 20,
- Handler: h,
+ Handler: h,
+ Queue: func(*http.Request) *RequestQueue { return rq },
Priority: func(r *http.Request, _ time.Time) int64 {
p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
return p
}}
c.Logf("starting initial requests")
- for i := 0; i < rl.MaxConcurrent; i++ {
+ for i := 0; i < rq.MaxConcurrent; i++ {
go func() {
rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
}()
}
c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
- for i := 0; i < rl.MaxConcurrent; i++ {
+ for i := 0; i < rq.MaxConcurrent; i++ {
<-h.inHandler
}
- c.Logf("starting %d priority=MinInt64 requests (should respond 503 immediately)", rl.MaxQueue)
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rq.MaxQueue)
var wgX sync.WaitGroup
- for i := 0; i < rl.MaxQueue; i++ {
+ for i := 0; i < rq.MaxQueue; i++ {
+ wgX.Add(1)
+ go func() {
+ defer wgX.Done()
+ resp := httptest.NewRecorder()
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
+ c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+ }()
+ }
+ wgX.Wait()
+
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rq.MaxQueue)
+ // Usage docs say the caller isn't allowed to change fields
+ // after first use, but we secretly know it's OK to change
+ // this field on the fly as long as no requests are arriving
+ // concurrently.
+ rq.MaxQueueTimeForMinPriority = time.Millisecond * 100
+ for i := 0; i < rq.MaxQueue; i++ {
wgX.Add(1)
go func() {
defer wgX.Done()
resp := httptest.NewRecorder()
- rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", math.MinInt64)}}})
+ t0 := time.Now()
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+ elapsed := time.Since(t0)
+ c.Check(elapsed > rq.MaxQueueTimeForMinPriority, check.Equals, true)
+ c.Check(elapsed < rq.MaxQueueTimeForMinPriority*10, check.Equals, true)
}()
}
wgX.Wait()
- c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
+ c.Logf("starting %d priority=1 and %d priority=1 requests", rq.MaxQueue, rq.MaxQueue)
var wg1, wg2 sync.WaitGroup
- wg1.Add(rl.MaxQueue)
- wg2.Add(rl.MaxQueue)
- for i := 0; i < rl.MaxQueue*2; i++ {
+ wg1.Add(rq.MaxQueue)
+ wg2.Add(rq.MaxQueue)
+ for i := 0; i < rq.MaxQueue*2; i++ {
i := i
go func() {
pri := (i & 1) + 1
wg1.Wait()
c.Logf("allowing initial requests to proceed")
- for i := 0; i < rl.MaxConcurrent; i++ {
+ for i := 0; i < rq.MaxConcurrent; i++ {
h.okToProceed <- struct{}{}
}
c.Logf("allowing queued priority=2 requests to proceed")
- for i := 0; i < rl.MaxQueue; i++ {
+ for i := 0; i < rq.MaxQueue; i++ {
<-h.inHandler
h.okToProceed <- struct{}{}
}