Merge branch '21285-max-gw-tunnels'
[arvados.git] / sdk / go / httpserver / request_limiter_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package httpserver
6
7 import (
8         "fmt"
9         "net/http"
10         "net/http/httptest"
11         "strconv"
12         "sync"
13         "time"
14
15         check "gopkg.in/check.v1"
16 )
17
18 type testHandler struct {
19         inHandler   chan struct{}
20         okToProceed chan struct{}
21 }
22
23 func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
24         h.inHandler <- struct{}{}
25         <-h.okToProceed
26 }
27
28 func newTestHandler() *testHandler {
29         return &testHandler{
30                 inHandler:   make(chan struct{}),
31                 okToProceed: make(chan struct{}),
32         }
33 }
34
35 func (s *Suite) TestRequestLimiter1(c *check.C) {
36         h := newTestHandler()
37         rq := &RequestQueue{
38                 MaxConcurrent: 1}
39         l := RequestLimiter{
40                 Queue:   func(*http.Request) *RequestQueue { return rq },
41                 Handler: h}
42         var wg sync.WaitGroup
43         resps := make([]*httptest.ResponseRecorder, 10)
44         for i := 0; i < 10; i++ {
45                 wg.Add(1)
46                 resps[i] = httptest.NewRecorder()
47                 go func(i int) {
48                         l.ServeHTTP(resps[i], &http.Request{})
49                         wg.Done()
50                 }(i)
51         }
52         done := make(chan struct{})
53         go func() {
54                 // Make sure one request has entered the handler
55                 <-h.inHandler
56                 // Make sure all unsuccessful requests finish (but don't wait
57                 // for the one that's still waiting for okToProceed)
58                 wg.Add(-1)
59                 wg.Wait()
60                 // Wait for the last goroutine
61                 wg.Add(1)
62                 h.okToProceed <- struct{}{}
63                 wg.Wait()
64                 done <- struct{}{}
65         }()
66         select {
67         case <-done:
68         case <-time.After(10 * time.Second):
69                 c.Fatal("test timed out, probably deadlocked")
70         }
71         n200 := 0
72         n503 := 0
73         for i := 0; i < 10; i++ {
74                 switch resps[i].Code {
75                 case 200:
76                         n200++
77                 case 503:
78                         n503++
79                 default:
80                         c.Fatalf("Unexpected response code %d", resps[i].Code)
81                 }
82         }
83         if n200 != 1 || n503 != 9 {
84                 c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
85         }
86         // Now that all 10 are finished, an 11th request should
87         // succeed.
88         go func() {
89                 <-h.inHandler
90                 h.okToProceed <- struct{}{}
91         }()
92         resp := httptest.NewRecorder()
93         l.ServeHTTP(resp, &http.Request{})
94         if resp.Code != 200 {
95                 c.Errorf("Got status %d on 11th request, want 200", resp.Code)
96         }
97 }
98
99 func (*Suite) TestRequestLimiter10(c *check.C) {
100         h := newTestHandler()
101         rq := &RequestQueue{
102                 MaxConcurrent: 10}
103         l := RequestLimiter{
104                 Queue:   func(*http.Request) *RequestQueue { return rq },
105                 Handler: h}
106         var wg sync.WaitGroup
107         for i := 0; i < 10; i++ {
108                 wg.Add(1)
109                 go func() {
110                         l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
111                         wg.Done()
112                 }()
113                 // Make sure the handler starts before we initiate the
114                 // next request, but don't let it finish yet.
115                 <-h.inHandler
116         }
117         for i := 0; i < 10; i++ {
118                 h.okToProceed <- struct{}{}
119         }
120         wg.Wait()
121 }
122
123 func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
124         h := newTestHandler()
125         rq := &RequestQueue{
126                 MaxConcurrent: 1000,
127                 MaxQueue:      200,
128         }
129         rl := RequestLimiter{
130                 Handler: h,
131                 Queue:   func(*http.Request) *RequestQueue { return rq },
132                 Priority: func(r *http.Request, _ time.Time) int64 {
133                         p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
134                         return p
135                 }}
136
137         c.Logf("starting initial requests")
138         for i := 0; i < rq.MaxConcurrent; i++ {
139                 go func() {
140                         rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
141                 }()
142         }
143         c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
144         for i := 0; i < rq.MaxConcurrent; i++ {
145                 <-h.inHandler
146         }
147
148         c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rq.MaxQueue)
149         var wgX sync.WaitGroup
150         for i := 0; i < rq.MaxQueue; i++ {
151                 wgX.Add(1)
152                 go func() {
153                         defer wgX.Done()
154                         resp := httptest.NewRecorder()
155                         rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
156                         c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
157                 }()
158         }
159         wgX.Wait()
160
161         c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rq.MaxQueue)
162         // Usage docs say the caller isn't allowed to change fields
163         // after first use, but we secretly know it's OK to change
164         // this field on the fly as long as no requests are arriving
165         // concurrently.
166         rq.MaxQueueTimeForMinPriority = time.Millisecond * 100
167         for i := 0; i < rq.MaxQueue; i++ {
168                 wgX.Add(1)
169                 go func() {
170                         defer wgX.Done()
171                         resp := httptest.NewRecorder()
172                         t0 := time.Now()
173                         rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
174                         c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
175                         elapsed := time.Since(t0)
176                         c.Check(elapsed > rq.MaxQueueTimeForMinPriority, check.Equals, true)
177                         c.Check(elapsed < rq.MaxQueueTimeForMinPriority*10, check.Equals, true)
178                 }()
179         }
180         wgX.Wait()
181
182         c.Logf("starting %d priority=1 and %d priority=1 requests", rq.MaxQueue, rq.MaxQueue)
183         var wg1, wg2 sync.WaitGroup
184         wg1.Add(rq.MaxQueue)
185         wg2.Add(rq.MaxQueue)
186         for i := 0; i < rq.MaxQueue*2; i++ {
187                 i := i
188                 go func() {
189                         pri := (i & 1) + 1
190                         resp := httptest.NewRecorder()
191                         rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
192                         if pri == 1 {
193                                 c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
194                                 wg1.Done()
195                         } else {
196                                 c.Check(resp.Code, check.Equals, http.StatusOK)
197                                 wg2.Done()
198                         }
199                 }()
200         }
201
202         c.Logf("waiting for queued priority=1 requests to fail")
203         wg1.Wait()
204
205         c.Logf("allowing initial requests to proceed")
206         for i := 0; i < rq.MaxConcurrent; i++ {
207                 h.okToProceed <- struct{}{}
208         }
209
210         c.Logf("allowing queued priority=2 requests to proceed")
211         for i := 0; i < rq.MaxQueue; i++ {
212                 <-h.inHandler
213                 h.okToProceed <- struct{}{}
214         }
215         c.Logf("waiting for queued priority=2 requests to succeed")
216         wg2.Wait()
217 }