1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
15 check "gopkg.in/check.v1"
18 type testHandler struct {
19 inHandler chan struct{}
20 okToProceed chan struct{}
23 func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
24 h.inHandler <- struct{}{}
28 func newTestHandler() *testHandler {
30 inHandler: make(chan struct{}),
31 okToProceed: make(chan struct{}),
35 func (s *Suite) TestRequestLimiter1(c *check.C) {
37 l := RequestLimiter{MaxConcurrent: 1, Handler: h}
39 resps := make([]*httptest.ResponseRecorder, 10)
40 for i := 0; i < 10; i++ {
42 resps[i] = httptest.NewRecorder()
44 l.ServeHTTP(resps[i], &http.Request{})
48 done := make(chan struct{})
50 // Make sure one request has entered the handler
52 // Make sure all unsuccessful requests finish (but don't wait
53 // for the one that's still waiting for okToProceed)
56 // Wait for the last goroutine
58 h.okToProceed <- struct{}{}
64 case <-time.After(10 * time.Second):
65 c.Fatal("test timed out, probably deadlocked")
69 for i := 0; i < 10; i++ {
70 switch resps[i].Code {
76 c.Fatalf("Unexpected response code %d", resps[i].Code)
79 if n200 != 1 || n503 != 9 {
80 c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
82 // Now that all 10 are finished, an 11th request should
86 h.okToProceed <- struct{}{}
88 resp := httptest.NewRecorder()
89 l.ServeHTTP(resp, &http.Request{})
91 c.Errorf("Got status %d on 11th request, want 200", resp.Code)
95 func (*Suite) TestRequestLimiter10(c *check.C) {
97 l := RequestLimiter{MaxConcurrent: 10, Handler: h}
99 for i := 0; i < 10; i++ {
102 l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
105 // Make sure the handler starts before we initiate the
106 // next request, but don't let it finish yet.
109 for i := 0; i < 10; i++ {
110 h.okToProceed <- struct{}{}
115 func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
116 h := newTestHandler()
117 rl := RequestLimiter{
121 Priority: func(r *http.Request, _ time.Time) int64 {
122 p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
126 c.Logf("starting initial requests")
127 for i := 0; i < rl.MaxConcurrent; i++ {
129 rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
132 c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
133 for i := 0; i < rl.MaxConcurrent; i++ {
137 c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue)
138 var wgX sync.WaitGroup
139 for i := 0; i < rl.MaxQueue; i++ {
143 resp := httptest.NewRecorder()
144 rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
145 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
150 c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue)
151 // Usage docs say the caller isn't allowed to change fields
152 // after first use, but we secretly know it's OK to change
153 // this field on the fly as long as no requests are arriving
155 rl.MaxQueueTimeForMinPriority = time.Millisecond * 100
156 for i := 0; i < rl.MaxQueue; i++ {
160 resp := httptest.NewRecorder()
162 rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
163 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
164 elapsed := time.Since(t0)
165 c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true)
166 c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true)
171 c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
172 var wg1, wg2 sync.WaitGroup
175 for i := 0; i < rl.MaxQueue*2; i++ {
179 resp := httptest.NewRecorder()
180 rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
182 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
185 c.Check(resp.Code, check.Equals, http.StatusOK)
191 c.Logf("waiting for queued priority=1 requests to fail")
194 c.Logf("allowing initial requests to proceed")
195 for i := 0; i < rl.MaxConcurrent; i++ {
196 h.okToProceed <- struct{}{}
199 c.Logf("allowing queued priority=2 requests to proceed")
200 for i := 0; i < rl.MaxQueue; i++ {
202 h.okToProceed <- struct{}{}
204 c.Logf("waiting for queued priority=2 requests to succeed")