21285: Send all SSH and tunnel endpoints to tunnel queue.
authorTom Clegg <tom@curii.com>
Thu, 28 Dec 2023 18:31:43 +0000 (13:31 -0500)
committerTom Clegg <tom@curii.com>
Thu, 28 Dec 2023 18:31:43 +0000 (13:31 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/service/cmd.go
lib/service/cmd_test.go

index e40b47acbbb6d4bb1b96a9de1bf2d625e28b2430..82e95fe0b4c38b8ab0e7cfa49ab6c17da386da00 100644 (file)
@@ -16,6 +16,7 @@ import (
        _ "net/http/pprof"
        "net/url"
        "os"
+       "regexp"
        "strings"
        "time"
 
@@ -297,7 +298,7 @@ func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster,
                Priority: c.requestPriority,
                Registry: reg,
                Queue: func(req *http.Request) *httpserver.RequestQueue {
-                       if strings.HasPrefix(req.URL.Path, "/arvados/v1/connect/") {
+                       if req.Method == http.MethodPost && reTunnelPath.MatchString(req.URL.Path) {
                                return rqTunnel
                        } else {
                                return rqAPI
@@ -306,6 +307,25 @@ func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster,
        }
 }
 
+// reTunnelPath matches paths of API endpoints that go in the "tunnel"
+// queue.
+var reTunnelPath = regexp.MustCompile(func() string {
+       rePathVar := regexp.MustCompile(`{.*?}`)
+       out := ""
+       for _, endpoint := range []arvados.APIEndpoint{
+               arvados.EndpointContainerGatewayTunnel,
+               arvados.EndpointContainerGatewayTunnelCompat,
+               arvados.EndpointContainerSSH,
+               arvados.EndpointContainerSSHCompat,
+       } {
+               if out != "" {
+                       out += "|"
+               }
+               out += `\Q/` + rePathVar.ReplaceAllString(endpoint.Path, `\E[^/]*\Q`) + `\E`
+       }
+       return "^(" + out + ")$"
+}())
+
 func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
        switch {
        case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
index 0266752f383ef861802fe0ad718f1e35d1e0ba9d..9ead90019e1302917b7f8d60448eb8b3f27d0bf6 100644 (file)
@@ -200,6 +200,15 @@ func (*Suite) TestCommand(c *check.C) {
        c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
 }
 
+func (s *Suite) TestTunnelPathRegexp(c *check.C) {
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false)
+}
+
 func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
        s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
 }
@@ -244,7 +253,7 @@ Clusters:
        started := make(chan bool, max+1)
        hold := make(chan bool)
        handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               if strings.HasPrefix(r.URL.Path, "/arvados/v1/connect") {
+               if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") {
                        <-hold
                } else {
                        started <- true
@@ -283,10 +292,11 @@ Clusters:
                activeReqs.Add(1)
                go func() {
                        defer activeReqs.Done()
-                       resp, err := client.Get("http://localhost:" + port + "/testpath")
+                       target := "http://localhost:" + port + "/testpath"
+                       resp, err := client.Get(target)
                        for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
                                time.Sleep(time.Second / 100)
-                               resp, err = client.Get("http://localhost:" + port + "/testpath")
+                               resp, err = client.Get(target)
                        }
                        if c.Check(err, check.IsNil) {
                                if resp.StatusCode == http.StatusOK {
@@ -302,14 +312,22 @@ Clusters:
        // API req limit
        extraTunnelReqs := 20
        var tunnelResp200, tunnelResp503 int64
+       var paths = []string{
+               "/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+               "/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+               "/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+               "/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+       }
        for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
+               i := i
                activeReqs.Add(1)
                go func() {
                        defer activeReqs.Done()
-                       resp, err := client.Get("http://localhost:" + port + "/arvados/v1/connect/...")
+                       target := "http://localhost:" + port + paths[i%len(paths)]
+                       resp, err := client.Post(target, "application/octet-stream", nil)
                        for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
                                time.Sleep(time.Second / 100)
-                               resp, err = client.Get("http://localhost:" + port + "/arvados/v1/connect/...")
+                               resp, err = client.Post(target, "application/octet-stream", nil)
                        }
                        if c.Check(err, check.IsNil) {
                                if resp.StatusCode == http.StatusOK {
@@ -327,6 +345,10 @@ Clusters:
                case <-started:
                case <-time.After(time.Second):
                        c.Logf("%s", stderr.String())
+                       c.Logf("apiResp200 %d", apiResp200)
+                       c.Logf("apiResp503 %d", apiResp503)
+                       c.Logf("tunnelResp200 %d", tunnelResp200)
+                       c.Logf("tunnelResp503 %d", tunnelResp503)
                        c.Fatal("timed out")
                }
        }
@@ -345,7 +367,7 @@ Clusters:
                c.Check(err, check.IsNil)
 
                for i := 0; i < len(loaded); i++ {
-                       if strings.HasPrefix(loaded[i].URL, "/arvados/v1/connect/") {
+                       if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") {
                                // Filter out a gateway tunnel req
                                // that doesn't count toward our API
                                // req limit