Merge branch '21611-log-chunk-delay'
[arvados.git] / lib / service / cmd_test.go
index ee0d4bb836339897e43e5877e7e83701a19ea4ba..9ead90019e1302917b7f8d60448eb8b3f27d0bf6 100644 (file)
@@ -17,6 +17,8 @@ import (
        "net/url"
        "os"
        "strings"
+       "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -39,15 +41,19 @@ const (
        contextKey key = iota
 )
 
-func (*Suite) TestGetListenAddress(c *check.C) {
+func unusedPort(c *check.C) string {
        // Find an available port on the testing host, so the test
        // cases don't get confused by "already in use" errors.
        listener, err := net.Listen("tcp", ":")
        c.Assert(err, check.IsNil)
-       _, unusedPort, err := net.SplitHostPort(listener.Addr().String())
-       c.Assert(err, check.IsNil)
        listener.Close()
+       _, port, err := net.SplitHostPort(listener.Addr().String())
+       c.Assert(err, check.IsNil)
+       return port
+}
 
+func (*Suite) TestGetListenAddress(c *check.C) {
+       port := unusedPort(c)
        defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
        for idx, trial := range []struct {
                // internalURL => listenURL, both with trailing "/"
@@ -60,17 +66,17 @@ func (*Suite) TestGetListenAddress(c *check.C) {
                expectInternal   string
        }{
                {
-                       internalURLs:   map[string]string{"http://localhost:" + unusedPort + "/": ""},
-                       expectListen:   "http://localhost:" + unusedPort + "/",
-                       expectInternal: "http://localhost:" + unusedPort + "/",
+                       internalURLs:   map[string]string{"http://localhost:" + port + "/": ""},
+                       expectListen:   "http://localhost:" + port + "/",
+                       expectInternal: "http://localhost:" + port + "/",
                },
                { // implicit port 80 in InternalURLs
                        internalURLs:     map[string]string{"http://localhost/": ""},
                        expectErrorMatch: `.*:80: bind: permission denied`,
                },
                { // implicit port 443 in InternalURLs
-                       internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + unusedPort + "/"},
-                       expectListen:   "http://localhost:" + unusedPort + "/",
+                       internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + port + "/"},
+                       expectListen:   "http://localhost:" + port + "/",
                        expectInternal: "https://host.example/",
                },
                { // implicit port 443 in ListenURL
@@ -85,16 +91,16 @@ func (*Suite) TestGetListenAddress(c *check.C) {
                {
                        internalURLs: map[string]string{
                                "https://hostname1.example/": "http://localhost:12435/",
-                               "https://hostname2.example/": "http://localhost:" + unusedPort + "/",
+                               "https://hostname2.example/": "http://localhost:" + port + "/",
                        },
                        envVar:         "https://hostname2.example", // note this works despite missing trailing "/"
-                       expectListen:   "http://localhost:" + unusedPort + "/",
+                       expectListen:   "http://localhost:" + port + "/",
                        expectInternal: "https://hostname2.example/",
                },
                { // cannot listen on any of the ListenURLs
                        internalURLs: map[string]string{
-                               "https://hostname1.example/": "http://1.2.3.4:" + unusedPort + "/",
-                               "https://hostname2.example/": "http://1.2.3.4:" + unusedPort + "/",
+                               "https://hostname1.example/": "http://1.2.3.4:" + port + "/",
+                               "https://hostname2.example/": "http://1.2.3.4:" + port + "/",
                        },
                        expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
                },
@@ -194,10 +200,28 @@ func (*Suite) TestCommand(c *check.C) {
        c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
 }
 
-func (*Suite) TestDumpRequests(c *check.C) {
+func (s *Suite) TestTunnelPathRegexp(c *check.C) {
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
+       c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false)
+       c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false)
+}
+
+func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
+       s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
+}
+
+func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) {
+       s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
+}
+
+func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
        defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
        requestQueueDumpCheckInterval = time.Second / 10
 
+       port := unusedPort(c)
        tmpdir := c.MkDir()
        cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
        c.Assert(err, check.IsNil)
@@ -205,32 +229,42 @@ func (*Suite) TestDumpRequests(c *check.C) {
        defer cf.Close()
 
        max := 24
+       maxTunnels := 30
        fmt.Fprintf(cf, `
 Clusters:
  zzzzz:
   SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
   ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
   API:
-   MaxConcurrentRailsRequests: %d
-   MaxQueuedRequests: 0
+   `+maxReqsConfigKey+`: %d
+   MaxQueuedRequests: 1
+   MaxGatewayTunnels: %d
   SystemLogs: {RequestQueueDumpDirectory: %q}
   Services:
    Controller:
-    ExternalURL: "http://localhost:12345"
-    InternalURLs: {"http://localhost:12345": {}}
-`, max, tmpdir)
+    ExternalURL: "http://localhost:`+port+`"
+    InternalURLs: {"http://localhost:`+port+`": {}}
+   WebDAV:
+    ExternalURL: "http://localhost:`+port+`"
+    InternalURLs: {"http://localhost:`+port+`": {}}
+`, max, maxTunnels, tmpdir)
+       cf.Close()
 
        started := make(chan bool, max+1)
        hold := make(chan bool)
        handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               started <- true
-               <-hold
+               if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") {
+                       <-hold
+               } else {
+                       started <- true
+                       <-hold
+               }
        })
        healthCheck := make(chan bool, 1)
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
-       cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+       cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
                return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
        })
        cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
@@ -239,7 +273,7 @@ Clusters:
        var stdin, stdout, stderr bytes.Buffer
 
        go func() {
-               cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+               cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
                close(exited)
        }()
        select {
@@ -250,15 +284,59 @@ Clusters:
        }
        client := http.Client{}
        deadline := time.Now().Add(time.Second * 2)
+       var activeReqs sync.WaitGroup
+
+       // Start some API reqs
+       var apiResp200, apiResp503 int64
        for i := 0; i < max+1; i++ {
+               activeReqs.Add(1)
                go func() {
-                       resp, err := client.Get("http://localhost:12345/testpath")
+                       defer activeReqs.Done()
+                       target := "http://localhost:" + port + "/testpath"
+                       resp, err := client.Get(target)
                        for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
                                time.Sleep(time.Second / 100)
-                               resp, err = client.Get("http://localhost:12345/testpath")
+                               resp, err = client.Get(target)
                        }
                        if c.Check(err, check.IsNil) {
-                               c.Logf("resp StatusCode %d", resp.StatusCode)
+                               if resp.StatusCode == http.StatusOK {
+                                       atomic.AddInt64(&apiResp200, 1)
+                               } else if resp.StatusCode == http.StatusServiceUnavailable {
+                                       atomic.AddInt64(&apiResp503, 1)
+                               }
+                       }
+               }()
+       }
+
+       // Start some gateway tunnel reqs that don't count toward our
+       // API req limit
+       extraTunnelReqs := 20
+       var tunnelResp200, tunnelResp503 int64
+       var paths = []string{
+               "/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+               "/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+               "/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+               "/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+       }
+       for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
+               i := i
+               activeReqs.Add(1)
+               go func() {
+                       defer activeReqs.Done()
+                       target := "http://localhost:" + port + paths[i%len(paths)]
+                       resp, err := client.Post(target, "application/octet-stream", nil)
+                       for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
+                               time.Sleep(time.Second / 100)
+                               resp, err = client.Post(target, "application/octet-stream", nil)
+                       }
+                       if c.Check(err, check.IsNil) {
+                               if resp.StatusCode == http.StatusOK {
+                                       atomic.AddInt64(&tunnelResp200, 1)
+                               } else if resp.StatusCode == http.StatusServiceUnavailable {
+                                       atomic.AddInt64(&tunnelResp503, 1)
+                               } else {
+                                       c.Errorf("tunnel response code %d", resp.StatusCode)
+                               }
                        }
                }()
        }
@@ -267,12 +345,16 @@ Clusters:
                case <-started:
                case <-time.After(time.Second):
                        c.Logf("%s", stderr.String())
-                       panic("timed out")
+                       c.Logf("apiResp200 %d", apiResp200)
+                       c.Logf("apiResp503 %d", apiResp503)
+                       c.Logf("tunnelResp200 %d", tunnelResp200)
+                       c.Logf("tunnelResp503 %d", tunnelResp503)
+                       c.Fatal("timed out")
                }
        }
        for delay := time.Second / 100; ; delay = delay * 2 {
                time.Sleep(delay)
-               j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
+               j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json")
                if os.IsNotExist(err) && deadline.After(time.Now()) {
                        continue
                }
@@ -283,6 +365,20 @@ Clusters:
                var loaded []struct{ URL string }
                err = json.Unmarshal(j, &loaded)
                c.Check(err, check.IsNil)
+
+               for i := 0; i < len(loaded); i++ {
+                       if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") {
+                               // Filter out a gateway tunnel req
+                               // that doesn't count toward our API
+                               // req limit
+                               if i < len(loaded)-1 {
+                                       copy(loaded[i:], loaded[i+1:])
+                                       i--
+                               }
+                               loaded = loaded[:len(loaded)-1]
+                       }
+               }
+
                if len(loaded) < max {
                        // Dumped when #requests was >90% but <100% of
                        // limit. If we stop now, we won't be able to
@@ -292,13 +388,13 @@ Clusters:
                        c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
                        continue
                }
-               c.Check(loaded, check.HasLen, max)
+               c.Check(loaded, check.HasLen, max+1)
                c.Check(loaded[0].URL, check.Equals, "/testpath")
                break
        }
 
        for _, path := range []string{"/_inspect/requests", "/metrics"} {
-               req, err := http.NewRequest("GET", "http://localhost:12345"+path, nil)
+               req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil)
                c.Assert(err, check.IsNil)
                req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
                resp, err := client.Do(req)
@@ -311,7 +407,8 @@ Clusters:
                c.Check(err, check.IsNil)
                switch path {
                case "/metrics":
-                       c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
+                       c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`)
+                       c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`)
                case "/_inspect/requests":
                        c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
                default:
@@ -319,11 +416,16 @@ Clusters:
                }
        }
        close(hold)
+       activeReqs.Wait()
+       c.Check(int(apiResp200), check.Equals, max+1)
+       c.Check(int(apiResp503), check.Equals, 0)
+       c.Check(int(tunnelResp200), check.Equals, maxTunnels)
+       c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs)
        cancel()
-
 }
 
 func (*Suite) TestTLS(c *check.C) {
+       port := unusedPort(c)
        cwd, err := os.Getwd()
        c.Assert(err, check.IsNil)
 
@@ -333,8 +435,8 @@ Clusters:
   SystemRootToken: abcde
   Services:
    Controller:
-    ExternalURL: "https://localhost:12345"
-    InternalURLs: {"https://localhost:12345": {}}
+    ExternalURL: "https://localhost:` + port + `"
+    InternalURLs: {"https://localhost:` + port + `": {}}
   TLS:
    Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
    Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
@@ -359,7 +461,7 @@ Clusters:
                defer close(got)
                client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
                for range time.NewTicker(time.Millisecond).C {
-                       resp, err := client.Get("https://localhost:12345")
+                       resp, err := client.Get("https://localhost:" + port)
                        if err != nil {
                                c.Log(err)
                                continue