X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/095e176632bbf81d28a239742a1ecce12404bd2d..HEAD:/lib/service/cmd_test.go diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go index 08b3a239dc..9ead90019e 100644 --- a/lib/service/cmd_test.go +++ b/lib/service/cmd_test.go @@ -17,6 +17,8 @@ import ( "net/url" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -198,15 +200,24 @@ func (*Suite) TestCommand(c *check.C) { c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`) } -func (s *Suite) TestDumpRequestsKeepweb(c *check.C) { - s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests") +func (s *Suite) TestTunnelPathRegexp(c *check.C) { + c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false) + c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false) } -func (s *Suite) TestDumpRequestsController(c *check.C) { - s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests") +func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) { + s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests") } -func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) { +func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) { + s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests") +} + +func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) { defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval) requestQueueDumpCheckInterval = time.Second / 10 @@ -218,6 +229,7 @@ func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxR defer cf.Close() max := 24 + maxTunnels := 30 fmt.Fprintf(cf, ` Clusters: zzzzz: @@ -225,7 +237,8 @@ Clusters: ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb API: `+maxReqsConfigKey+`: %d - MaxQueuedRequests: 0 + MaxQueuedRequests: 1 + MaxGatewayTunnels: %d SystemLogs: {RequestQueueDumpDirectory: %q} Services: Controller: @@ -234,14 +247,18 @@ Clusters: WebDAV: ExternalURL: "http://localhost:`+port+`" InternalURLs: {"http://localhost:`+port+`": {}} -`, max, tmpdir) +`, max, maxTunnels, tmpdir) cf.Close() started := make(chan bool, max+1) hold := make(chan bool) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - started <- true - <-hold + if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") { + <-hold + } else { + started <- true + <-hold + } }) healthCheck := make(chan bool, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -267,15 +284,59 @@ Clusters: } client := http.Client{} deadline := time.Now().Add(time.Second * 2) + var activeReqs sync.WaitGroup + + // Start some API reqs + var apiResp200, apiResp503 int64 for i := 0; i < max+1; i++ { + activeReqs.Add(1) + go func() { + defer activeReqs.Done() + target := "http://localhost:" + port + "/testpath" + resp, err := client.Get(target) + for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) { + time.Sleep(time.Second / 100) + resp, err = client.Get(target) + } + if c.Check(err, check.IsNil) { + if resp.StatusCode == http.StatusOK { + atomic.AddInt64(&apiResp200, 1) + } else if resp.StatusCode == http.StatusServiceUnavailable { + atomic.AddInt64(&apiResp503, 1) + } + } + }() + } + + // Start some gateway tunnel reqs that don't count toward our + // API req limit + extraTunnelReqs := 20 + var tunnelResp200, tunnelResp503 int64 + var paths = []string{ + "/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + "/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + "/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + "/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + } + for i := 0; i < maxTunnels+extraTunnelReqs; i++ { + i := i + activeReqs.Add(1) go func() { - resp, err := client.Get("http://localhost:" + port + "/testpath") + defer activeReqs.Done() + target := "http://localhost:" + port + paths[i%len(paths)] + resp, err := client.Post(target, "application/octet-stream", nil) for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) { time.Sleep(time.Second / 100) - resp, err = client.Get("http://localhost:" + port + "/testpath") + resp, err = client.Post(target, "application/octet-stream", nil) } if c.Check(err, check.IsNil) { - c.Logf("resp StatusCode %d", resp.StatusCode) + if resp.StatusCode == http.StatusOK { + atomic.AddInt64(&tunnelResp200, 1) + } else if resp.StatusCode == http.StatusServiceUnavailable { + atomic.AddInt64(&tunnelResp503, 1) + } else { + c.Errorf("tunnel response code %d", resp.StatusCode) + } } }() } @@ -284,6 +345,10 @@ Clusters: case <-started: case <-time.After(time.Second): c.Logf("%s", stderr.String()) + c.Logf("apiResp200 %d", apiResp200) + c.Logf("apiResp503 %d", apiResp503) + c.Logf("tunnelResp200 %d", tunnelResp200) + c.Logf("tunnelResp503 %d", tunnelResp503) c.Fatal("timed out") } } @@ -300,6 +365,20 @@ Clusters: var loaded []struct{ URL string } err = json.Unmarshal(j, &loaded) c.Check(err, check.IsNil) + + for i := 0; i < len(loaded); i++ { + if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") { + // Filter out a gateway tunnel req + // that doesn't count toward our API + // req limit + if i < len(loaded)-1 { + copy(loaded[i:], loaded[i+1:]) + i-- + } + loaded = loaded[:len(loaded)-1] + } + } + if len(loaded) < max { // Dumped when #requests was >90% but <100% of // limit. If we stop now, we won't be able to @@ -309,7 +388,7 @@ Clusters: c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max) continue } - c.Check(loaded, check.HasLen, max) + c.Check(loaded, check.HasLen, max+1) c.Check(loaded[0].URL, check.Equals, "/testpath") break } @@ -328,7 +407,8 @@ Clusters: c.Check(err, check.IsNil) switch path { case "/metrics": - c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`) + c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`) + c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`) case "/_inspect/requests": c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`) default: @@ -336,6 +416,11 @@ Clusters: } } close(hold) + activeReqs.Wait() + c.Check(int(apiResp200), check.Equals, max+1) + c.Check(int(apiResp503), check.Equals, 0) + c.Check(int(tunnelResp200), check.Equals, maxTunnels) + c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs) cancel() }