Merge branch '20726-s3list-pages'
[arvados.git] / sdk / go / httpserver / request_limiter_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "fmt"
9         "net/http"
10         "net/http/httptest"
11         "strconv"
12         "sync"
13         "time"
14
15         check "gopkg.in/check.v1"
16 )
17
18 type testHandler struct {
19         inHandler   chan struct{}
20         okToProceed chan struct{}
21 }
22
23 func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
24         h.inHandler <- struct{}{}
25         <-h.okToProceed
26 }
27
28 func newTestHandler() *testHandler {
29         return &testHandler{
30                 inHandler:   make(chan struct{}),
31                 okToProceed: make(chan struct{}),
32         }
33 }
34
35 func (s *Suite) TestRequestLimiter1(c *check.C) {
36         h := newTestHandler()
37         l := RequestLimiter{MaxConcurrent: 1, Handler: h}
38         var wg sync.WaitGroup
39         resps := make([]*httptest.ResponseRecorder, 10)
40         for i := 0; i < 10; i++ {
41                 wg.Add(1)
42                 resps[i] = httptest.NewRecorder()
43                 go func(i int) {
44                         l.ServeHTTP(resps[i], &http.Request{})
45                         wg.Done()
46                 }(i)
47         }
48         done := make(chan struct{})
49         go func() {
50                 // Make sure one request has entered the handler
51                 <-h.inHandler
52                 // Make sure all unsuccessful requests finish (but don't wait
53                 // for the one that's still waiting for okToProceed)
54                 wg.Add(-1)
55                 wg.Wait()
56                 // Wait for the last goroutine
57                 wg.Add(1)
58                 h.okToProceed <- struct{}{}
59                 wg.Wait()
60                 done <- struct{}{}
61         }()
62         select {
63         case <-done:
64         case <-time.After(10 * time.Second):
65                 c.Fatal("test timed out, probably deadlocked")
66         }
67         n200 := 0
68         n503 := 0
69         for i := 0; i < 10; i++ {
70                 switch resps[i].Code {
71                 case 200:
72                         n200++
73                 case 503:
74                         n503++
75                 default:
76                         c.Fatalf("Unexpected response code %d", resps[i].Code)
77                 }
78         }
79         if n200 != 1 || n503 != 9 {
80                 c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
81         }
82         // Now that all 10 are finished, an 11th request should
83         // succeed.
84         go func() {
85                 <-h.inHandler
86                 h.okToProceed <- struct{}{}
87         }()
88         resp := httptest.NewRecorder()
89         l.ServeHTTP(resp, &http.Request{})
90         if resp.Code != 200 {
91                 c.Errorf("Got status %d on 11th request, want 200", resp.Code)
92         }
93 }
94
95 func (*Suite) TestRequestLimiter10(c *check.C) {
96         h := newTestHandler()
97         l := RequestLimiter{MaxConcurrent: 10, Handler: h}
98         var wg sync.WaitGroup
99         for i := 0; i < 10; i++ {
100                 wg.Add(1)
101                 go func() {
102                         l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
103                         wg.Done()
104                 }()
105                 // Make sure the handler starts before we initiate the
106                 // next request, but don't let it finish yet.
107                 <-h.inHandler
108         }
109         for i := 0; i < 10; i++ {
110                 h.okToProceed <- struct{}{}
111         }
112         wg.Wait()
113 }
114
115 func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
116         h := newTestHandler()
117         rl := RequestLimiter{
118                 MaxConcurrent: 1000,
119                 MaxQueue:      200,
120                 Handler:       h,
121                 Priority: func(r *http.Request, _ time.Time) int64 {
122                         p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
123                         return p
124                 }}
125
126         c.Logf("starting initial requests")
127         for i := 0; i < rl.MaxConcurrent; i++ {
128                 go func() {
129                         rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
130                 }()
131         }
132         c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
133         for i := 0; i < rl.MaxConcurrent; i++ {
134                 <-h.inHandler
135         }
136
137         c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue)
138         var wgX sync.WaitGroup
139         for i := 0; i < rl.MaxQueue; i++ {
140                 wgX.Add(1)
141                 go func() {
142                         defer wgX.Done()
143                         resp := httptest.NewRecorder()
144                         rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
145                         c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
146                 }()
147         }
148         wgX.Wait()
149
150         c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue)
151         // Usage docs say the caller isn't allowed to change fields
152         // after first use, but we secretly know it's OK to change
153         // this field on the fly as long as no requests are arriving
154         // concurrently.
155         rl.MaxQueueTimeForMinPriority = time.Millisecond * 100
156         for i := 0; i < rl.MaxQueue; i++ {
157                 wgX.Add(1)
158                 go func() {
159                         defer wgX.Done()
160                         resp := httptest.NewRecorder()
161                         t0 := time.Now()
162                         rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
163                         c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
164                         elapsed := time.Since(t0)
165                         c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true)
166                         c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true)
167                 }()
168         }
169         wgX.Wait()
170
171         c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
172         var wg1, wg2 sync.WaitGroup
173         wg1.Add(rl.MaxQueue)
174         wg2.Add(rl.MaxQueue)
175         for i := 0; i < rl.MaxQueue*2; i++ {
176                 i := i
177                 go func() {
178                         pri := (i & 1) + 1
179                         resp := httptest.NewRecorder()
180                         rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
181                         if pri == 1 {
182                                 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
183                                 wg1.Done()
184                         } else {
185                                 c.Check(resp.Code, check.Equals, http.StatusOK)
186                                 wg2.Done()
187                         }
188                 }()
189         }
190
191         c.Logf("waiting for queued priority=1 requests to fail")
192         wg1.Wait()
193
194         c.Logf("allowing initial requests to proceed")
195         for i := 0; i < rl.MaxConcurrent; i++ {
196                 h.okToProceed <- struct{}{}
197         }
198
199         c.Logf("allowing queued priority=2 requests to proceed")
200         for i := 0; i < rl.MaxQueue; i++ {
201                 <-h.inHandler
202                 h.okToProceed <- struct{}{}
203         }
204         c.Logf("waiting for queued priority=2 requests to succeed")
205         wg2.Wait()
206 }