X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1c92ba23521c23d00f97fee0afa2649afd73b504..d9592ca6a1a9da3adb5c7a5b8201765dad3af93b:/sdk/go/httpserver/request_limiter_test.go diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go index bdc0401b38..7366e1426b 100644 --- a/sdk/go/httpserver/request_limiter_test.go +++ b/sdk/go/httpserver/request_limiter_test.go @@ -6,7 +6,6 @@ package httpserver import ( "fmt" - "math" "net/http" "net/http/httptest" "strconv" @@ -35,7 +34,11 @@ func newTestHandler() *testHandler { func (s *Suite) TestRequestLimiter1(c *check.C) { h := newTestHandler() - l := RequestLimiter{MaxConcurrent: 1, Handler: h} + rq := &RequestQueue{ + MaxConcurrent: 1} + l := RequestLimiter{ + Queue: func(*http.Request) *RequestQueue { return rq }, + Handler: h} var wg sync.WaitGroup resps := make([]*httptest.ResponseRecorder, 10) for i := 0; i < 10; i++ { @@ -95,7 +98,11 @@ func (s *Suite) TestRequestLimiter1(c *check.C) { func (*Suite) TestRequestLimiter10(c *check.C) { h := newTestHandler() - l := RequestLimiter{MaxConcurrent: 10, Handler: h} + rq := &RequestQueue{ + MaxConcurrent: 10} + l := RequestLimiter{ + Queue: func(*http.Request) *RequestQueue { return rq }, + Handler: h} var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) @@ -115,44 +122,68 @@ func (*Suite) TestRequestLimiter10(c *check.C) { func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { h := newTestHandler() + rq := &RequestQueue{ + MaxConcurrent: 1000, + MaxQueue: 200, + } rl := RequestLimiter{ - MaxConcurrent: 100, - MaxQueue: 20, - Handler: h, + Handler: h, + Queue: func(*http.Request) *RequestQueue { return rq }, Priority: func(r *http.Request, _ time.Time) int64 { p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64) return p }} c.Logf("starting initial requests") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { go func() { rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}}) }() } c.Logf("waiting for initial requests to consume all MaxConcurrent slots") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { <-h.inHandler } - c.Logf("starting %d priority=MinInt64 requests (should respond 503 immediately)", rl.MaxQueue) + c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rq.MaxQueue) var wgX sync.WaitGroup - for i := 0; i < rl.MaxQueue; i++ { + for i := 0; i < rq.MaxQueue; i++ { + wgX.Add(1) + go func() { + defer wgX.Done() + resp := httptest.NewRecorder() + rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}}) + c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) + }() + } + wgX.Wait() + + c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rq.MaxQueue) + // Usage docs say the caller isn't allowed to change fields + // after first use, but we secretly know it's OK to change + // this field on the fly as long as no requests are arriving + // concurrently. + rq.MaxQueueTimeForMinPriority = time.Millisecond * 100 + for i := 0; i < rq.MaxQueue; i++ { wgX.Add(1) go func() { defer wgX.Done() resp := httptest.NewRecorder() - rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", math.MinInt64)}}}) + t0 := time.Now() + rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}}) c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) + elapsed := time.Since(t0) + c.Check(elapsed > rq.MaxQueueTimeForMinPriority, check.Equals, true) + c.Check(elapsed < rq.MaxQueueTimeForMinPriority*10, check.Equals, true) }() } wgX.Wait() - c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue) + c.Logf("starting %d priority=1 and %d priority=1 requests", rq.MaxQueue, rq.MaxQueue) var wg1, wg2 sync.WaitGroup - wg1.Add(rl.MaxQueue) - wg2.Add(rl.MaxQueue) - for i := 0; i < rl.MaxQueue*2; i++ { + wg1.Add(rq.MaxQueue) + wg2.Add(rq.MaxQueue) + for i := 0; i < rq.MaxQueue*2; i++ { i := i go func() { pri := (i & 1) + 1 @@ -172,12 +203,12 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { wg1.Wait() c.Logf("allowing initial requests to proceed") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { h.okToProceed <- struct{}{} } c.Logf("allowing queued priority=2 requests to proceed") - for i := 0; i < rl.MaxQueue; i++ { + for i := 0; i < rq.MaxQueue; i++ { <-h.inHandler h.okToProceed <- struct{}{} }