Merge branch '20602-controller-qos'
[arvados.git] / sdk / go / httpserver / request_limiter_test.go
index 9258fbfa58f4b5a4867651fc15aba4e9b9616dcf..b04ff57cc176fca18d58773565cee1a55f3a5357 100644 (file)
@@ -5,11 +5,14 @@
 package httpserver
 
 import (
+       "fmt"
        "net/http"
        "net/http/httptest"
+       "strconv"
        "sync"
-       "testing"
        "time"
+
+       check "gopkg.in/check.v1"
 )
 
 type testHandler struct {
@@ -29,9 +32,9 @@ func newTestHandler() *testHandler {
        }
 }
 
-func TestRequestLimiter1(t *testing.T) {
+func (s *Suite) TestRequestLimiter1(c *check.C) {
        h := newTestHandler()
-       l := NewRequestLimiter(1, h, nil)
+       l := RequestLimiter{MaxConcurrent: 1, Handler: h}
        var wg sync.WaitGroup
        resps := make([]*httptest.ResponseRecorder, 10)
        for i := 0; i < 10; i++ {
@@ -59,7 +62,7 @@ func TestRequestLimiter1(t *testing.T) {
        select {
        case <-done:
        case <-time.After(10 * time.Second):
-               t.Fatal("test timed out, probably deadlocked")
+               c.Fatal("test timed out, probably deadlocked")
        }
        n200 := 0
        n503 := 0
@@ -70,11 +73,11 @@ func TestRequestLimiter1(t *testing.T) {
                case 503:
                        n503++
                default:
-                       t.Fatalf("Unexpected response code %d", resps[i].Code)
+                       c.Fatalf("Unexpected response code %d", resps[i].Code)
                }
        }
        if n200 != 1 || n503 != 9 {
-               t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+               c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
        }
        // Now that all 10 are finished, an 11th request should
        // succeed.
@@ -85,13 +88,13 @@ func TestRequestLimiter1(t *testing.T) {
        resp := httptest.NewRecorder()
        l.ServeHTTP(resp, &http.Request{})
        if resp.Code != 200 {
-               t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+               c.Errorf("Got status %d on 11th request, want 200", resp.Code)
        }
 }
 
-func TestRequestLimiter10(t *testing.T) {
+func (*Suite) TestRequestLimiter10(c *check.C) {
        h := newTestHandler()
-       l := NewRequestLimiter(10, h, nil)
+       l := RequestLimiter{MaxConcurrent: 10, Handler: h}
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {
                wg.Add(1)
@@ -108,3 +111,75 @@ func TestRequestLimiter10(t *testing.T) {
        }
        wg.Wait()
 }
+
+func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
+       h := newTestHandler()
+       rl := RequestLimiter{
+               MaxConcurrent: 1000,
+               MaxQueue:      200,
+               Handler:       h,
+               Priority: func(r *http.Request, _ time.Time) int64 {
+                       p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
+                       return p
+               }}
+
+       c.Logf("starting initial requests")
+       for i := 0; i < rl.MaxConcurrent; i++ {
+               go func() {
+                       rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
+               }()
+       }
+       c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
+       for i := 0; i < rl.MaxConcurrent; i++ {
+               <-h.inHandler
+       }
+
+       c.Logf("starting %d priority=IneligibleForQueuePriority requests (should respond 503 immediately)", rl.MaxQueue)
+       var wgX sync.WaitGroup
+       for i := 0; i < rl.MaxQueue; i++ {
+               wgX.Add(1)
+               go func() {
+                       defer wgX.Done()
+                       resp := httptest.NewRecorder()
+                       rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", IneligibleForQueuePriority)}}})
+                       c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+               }()
+       }
+       wgX.Wait()
+
+       c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
+       var wg1, wg2 sync.WaitGroup
+       wg1.Add(rl.MaxQueue)
+       wg2.Add(rl.MaxQueue)
+       for i := 0; i < rl.MaxQueue*2; i++ {
+               i := i
+               go func() {
+                       pri := (i & 1) + 1
+                       resp := httptest.NewRecorder()
+                       rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
+                       if pri == 1 {
+                               c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+                               wg1.Done()
+                       } else {
+                               c.Check(resp.Code, check.Equals, http.StatusOK)
+                               wg2.Done()
+                       }
+               }()
+       }
+
+       c.Logf("waiting for queued priority=1 requests to fail")
+       wg1.Wait()
+
+       c.Logf("allowing initial requests to proceed")
+       for i := 0; i < rl.MaxConcurrent; i++ {
+               h.okToProceed <- struct{}{}
+       }
+
+       c.Logf("allowing queued priority=2 requests to proceed")
+       for i := 0; i < rl.MaxQueue; i++ {
+               <-h.inHandler
+               h.okToProceed <- struct{}{}
+       }
+       c.Logf("waiting for queued priority=2 requests to succeed")
+       wg2.Wait()
+}