1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
15 check "gopkg.in/check.v1"
18 type testHandler struct {
19 inHandler chan struct{}
20 okToProceed chan struct{}
23 func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
24 h.inHandler <- struct{}{}
28 func newTestHandler() *testHandler {
30 inHandler: make(chan struct{}),
31 okToProceed: make(chan struct{}),
35 func (s *Suite) TestRequestLimiter1(c *check.C) {
40 Queue: func(*http.Request) *RequestQueue { return rq },
43 resps := make([]*httptest.ResponseRecorder, 10)
44 for i := 0; i < 10; i++ {
46 resps[i] = httptest.NewRecorder()
48 l.ServeHTTP(resps[i], &http.Request{})
52 done := make(chan struct{})
54 // Make sure one request has entered the handler
56 // Make sure all unsuccessful requests finish (but don't wait
57 // for the one that's still waiting for okToProceed)
60 // Wait for the last goroutine
62 h.okToProceed <- struct{}{}
68 case <-time.After(10 * time.Second):
69 c.Fatal("test timed out, probably deadlocked")
73 for i := 0; i < 10; i++ {
74 switch resps[i].Code {
80 c.Fatalf("Unexpected response code %d", resps[i].Code)
83 if n200 != 1 || n503 != 9 {
84 c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
86 // Now that all 10 are finished, an 11th request should
90 h.okToProceed <- struct{}{}
92 resp := httptest.NewRecorder()
93 l.ServeHTTP(resp, &http.Request{})
95 c.Errorf("Got status %d on 11th request, want 200", resp.Code)
99 func (*Suite) TestRequestLimiter10(c *check.C) {
100 h := newTestHandler()
104 Queue: func(*http.Request) *RequestQueue { return rq },
106 var wg sync.WaitGroup
107 for i := 0; i < 10; i++ {
110 l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
113 // Make sure the handler starts before we initiate the
114 // next request, but don't let it finish yet.
117 for i := 0; i < 10; i++ {
118 h.okToProceed <- struct{}{}
123 func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
124 h := newTestHandler()
129 rl := RequestLimiter{
131 Queue: func(*http.Request) *RequestQueue { return rq },
132 Priority: func(r *http.Request, _ time.Time) int64 {
133 p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
137 c.Logf("starting initial requests")
138 for i := 0; i < rq.MaxConcurrent; i++ {
140 rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
143 c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
144 for i := 0; i < rq.MaxConcurrent; i++ {
148 c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rq.MaxQueue)
149 var wgX sync.WaitGroup
150 for i := 0; i < rq.MaxQueue; i++ {
154 resp := httptest.NewRecorder()
155 rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
156 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
161 c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rq.MaxQueue)
162 // Usage docs say the caller isn't allowed to change fields
163 // after first use, but we secretly know it's OK to change
164 // this field on the fly as long as no requests are arriving
166 rq.MaxQueueTimeForMinPriority = time.Millisecond * 100
167 for i := 0; i < rq.MaxQueue; i++ {
171 resp := httptest.NewRecorder()
173 rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
174 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
175 elapsed := time.Since(t0)
176 c.Check(elapsed > rq.MaxQueueTimeForMinPriority, check.Equals, true)
177 c.Check(elapsed < rq.MaxQueueTimeForMinPriority*10, check.Equals, true)
182 c.Logf("starting %d priority=1 and %d priority=1 requests", rq.MaxQueue, rq.MaxQueue)
183 var wg1, wg2 sync.WaitGroup
186 for i := 0; i < rq.MaxQueue*2; i++ {
190 resp := httptest.NewRecorder()
191 rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
193 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
196 c.Check(resp.Code, check.Equals, http.StatusOK)
202 c.Logf("waiting for queued priority=1 requests to fail")
205 c.Logf("allowing initial requests to proceed")
206 for i := 0; i < rq.MaxConcurrent; i++ {
207 h.okToProceed <- struct{}{}
210 c.Logf("allowing queued priority=2 requests to proceed")
211 for i := 0; i < rq.MaxQueue; i++ {
213 h.okToProceed <- struct{}{}
215 c.Logf("waiting for queued priority=2 requests to succeed")