21285: Use separate request limit/queue for gateway tunnel requests.
[arvados.git] / lib / service / cmd_test.go
index 08b3a239dc2583c5da4271465cc550521dc54a79..0266752f383ef861802fe0ad718f1e35d1e0ba9d 100644 (file)
@@ -17,6 +17,8 @@ import (
        "net/url"
        "os"
        "strings"
+       "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -198,15 +200,15 @@ func (*Suite) TestCommand(c *check.C) {
        c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
 }
 
-func (s *Suite) TestDumpRequestsKeepweb(c *check.C) {
-       s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
+func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
+       s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
 }
 
-func (s *Suite) TestDumpRequestsController(c *check.C) {
-       s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
+func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) {
+       s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
 }
 
-func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
+func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
        defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
        requestQueueDumpCheckInterval = time.Second / 10
 
@@ -218,6 +220,7 @@ func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxR
        defer cf.Close()
 
        max := 24
+       maxTunnels := 30
        fmt.Fprintf(cf, `
 Clusters:
  zzzzz:
@@ -225,7 +228,8 @@ Clusters:
   ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
   API:
    `+maxReqsConfigKey+`: %d
-   MaxQueuedRequests: 0
+   MaxQueuedRequests: 1
+   MaxGatewayTunnels: %d
   SystemLogs: {RequestQueueDumpDirectory: %q}
   Services:
    Controller:
@@ -234,14 +238,18 @@ Clusters:
    WebDAV:
     ExternalURL: "http://localhost:`+port+`"
     InternalURLs: {"http://localhost:`+port+`": {}}
-`, max, tmpdir)
+`, max, maxTunnels, tmpdir)
        cf.Close()
 
        started := make(chan bool, max+1)
        hold := make(chan bool)
        handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               started <- true
-               <-hold
+               if strings.HasPrefix(r.URL.Path, "/arvados/v1/connect") {
+                       <-hold
+               } else {
+                       started <- true
+                       <-hold
+               }
        })
        healthCheck := make(chan bool, 1)
        ctx, cancel := context.WithCancel(context.Background())
@@ -267,15 +275,50 @@ Clusters:
        }
        client := http.Client{}
        deadline := time.Now().Add(time.Second * 2)
+       var activeReqs sync.WaitGroup
+
+       // Start some API reqs
+       var apiResp200, apiResp503 int64
        for i := 0; i < max+1; i++ {
+               activeReqs.Add(1)
                go func() {
+                       defer activeReqs.Done()
                        resp, err := client.Get("http://localhost:" + port + "/testpath")
                        for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
                                time.Sleep(time.Second / 100)
                                resp, err = client.Get("http://localhost:" + port + "/testpath")
                        }
                        if c.Check(err, check.IsNil) {
-                               c.Logf("resp StatusCode %d", resp.StatusCode)
+                               if resp.StatusCode == http.StatusOK {
+                                       atomic.AddInt64(&apiResp200, 1)
+                               } else if resp.StatusCode == http.StatusServiceUnavailable {
+                                       atomic.AddInt64(&apiResp503, 1)
+                               }
+                       }
+               }()
+       }
+
+       // Start some gateway tunnel reqs that don't count toward our
+       // API req limit
+       extraTunnelReqs := 20
+       var tunnelResp200, tunnelResp503 int64
+       for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
+               activeReqs.Add(1)
+               go func() {
+                       defer activeReqs.Done()
+                       resp, err := client.Get("http://localhost:" + port + "/arvados/v1/connect/...")
+                       for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
+                               time.Sleep(time.Second / 100)
+                               resp, err = client.Get("http://localhost:" + port + "/arvados/v1/connect/...")
+                       }
+                       if c.Check(err, check.IsNil) {
+                               if resp.StatusCode == http.StatusOK {
+                                       atomic.AddInt64(&tunnelResp200, 1)
+                               } else if resp.StatusCode == http.StatusServiceUnavailable {
+                                       atomic.AddInt64(&tunnelResp503, 1)
+                               } else {
+                                       c.Errorf("tunnel response code %d", resp.StatusCode)
+                               }
                        }
                }()
        }
@@ -300,6 +343,20 @@ Clusters:
                var loaded []struct{ URL string }
                err = json.Unmarshal(j, &loaded)
                c.Check(err, check.IsNil)
+
+               for i := 0; i < len(loaded); i++ {
+                       if strings.HasPrefix(loaded[i].URL, "/arvados/v1/connect/") {
+                               // Filter out a gateway tunnel req
+                               // that doesn't count toward our API
+                               // req limit
+                               if i < len(loaded)-1 {
+                                       copy(loaded[i:], loaded[i+1:])
+                                       i--
+                               }
+                               loaded = loaded[:len(loaded)-1]
+                       }
+               }
+
                if len(loaded) < max {
                        // Dumped when #requests was >90% but <100% of
                        // limit. If we stop now, we won't be able to
@@ -309,7 +366,7 @@ Clusters:
                        c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
                        continue
                }
-               c.Check(loaded, check.HasLen, max)
+               c.Check(loaded, check.HasLen, max+1)
                c.Check(loaded[0].URL, check.Equals, "/testpath")
                break
        }
@@ -328,7 +385,8 @@ Clusters:
                c.Check(err, check.IsNil)
                switch path {
                case "/metrics":
-                       c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
+                       c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`)
+                       c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`)
                case "/_inspect/requests":
                        c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
                default:
@@ -336,6 +394,11 @@ Clusters:
                }
        }
        close(hold)
+       activeReqs.Wait()
+       c.Check(int(apiResp200), check.Equals, max+1)
+       c.Check(int(apiResp503), check.Equals, 0)
+       c.Check(int(tunnelResp200), check.Equals, maxTunnels)
+       c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs)
        cancel()
 }