Merge branch '21124-max-rails-reqs'
authorTom Clegg <tom@curii.com>
Fri, 27 Oct 2023 15:44:38 +0000 (11:44 -0400)
committerTom Clegg <tom@curii.com>
Fri, 27 Oct 2023 15:44:38 +0000 (11:44 -0400)
fixes #21124

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

doc/admin/upgrading.html.textile.liquid
lib/config/config.default.yml
lib/config/export.go
lib/service/cmd.go
lib/service/cmd_test.go
sdk/go/arvados/config.go
tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls
tools/salt-install/config_examples/multi_host/aws/pillars/nginx_passenger.sls

index 46b008ca70d4e6a45c9408fec69f64ba18b7d239..ee7ac4e44e97aeebb860868e872f7b57270fa879 100644 (file)
@@ -28,6 +28,14 @@ TODO: extract this information based on git commit messages and generate changel
 <div class="releasenotes">
 </notextile>
 
+h2(#main). development main
+
+"previous: Upgrading to 2.7.0":#v2_7_0
+
+The default configuration value @API.MaxConcurrentRequests@ (the number of concurrent requests that will be processed by a single instance of an arvados service process) is raised from 8 to 64.
+
+A new configuration key @API.MaxConcurrentRailsRequests@ (default 8) limits the number of concurrent requests processed by a RailsAPI service process.
+
 h2(#v2_7_0). v2.7.0 (2023-09-21)
 
 "previous: Upgrading to 2.6.3":#v2_6_3
index 32727b1bce78226ac34e27d3eb9791c173a34cbc..abffc1c0b801a1683462bd29419a5b200ce75d39 100644 (file)
@@ -225,7 +225,17 @@ Clusters:
 
       # Maximum number of concurrent requests to process concurrently
       # in a single service process, or 0 for no limit.
-      MaxConcurrentRequests: 8
+      #
+      # Note this applies to all Arvados services (controller, webdav,
+      # websockets, etc.). Concurrency in the controller service is
+      # also effectively limited by MaxConcurrentRailsRequests (see
+      # below) because most controller requests proxy through to the
+      # RailsAPI service.
+      MaxConcurrentRequests: 64
+
+      # Maximum number of concurrent requests to process concurrently
+      # in a single RailsAPI service process, or 0 for no limit.
+      MaxConcurrentRailsRequests: 8
 
       # Maximum number of incoming requests to hold in a priority
       # queue waiting for one of the MaxConcurrentRequests slots to be
index 88c64f69a10cf66d641db65f96a51de8a38c7dfd..e1f5ff9ee13473d77328e8c8521d9d4370b76483 100644 (file)
@@ -68,6 +68,7 @@ var whitelist = map[string]bool{
        "API.KeepServiceRequestTimeout":            false,
        "API.LockBeforeUpdate":                     false,
        "API.LogCreateRequestFraction":             false,
+       "API.MaxConcurrentRailsRequests":           false,
        "API.MaxConcurrentRequests":                false,
        "API.MaxIndexDatabaseRead":                 false,
        "API.MaxItemsPerResponse":                  true,
index 854b94861f1362f9e58592f771400bed28b9afaa..725f86f3bda5c2a82476615ba9ecd6e7a9b7a4fa 100644 (file)
@@ -148,6 +148,19 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                return 1
        }
 
+       maxReqs := cluster.API.MaxConcurrentRequests
+       if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+               (maxRails < maxReqs || maxReqs == 0) &&
+               strings.HasSuffix(prog, "controller") {
+               // Ideally, we would accept up to
+               // MaxConcurrentRequests, and apply the
+               // MaxConcurrentRailsRequests limit only for requests
+               // that require calling upstream to RailsAPI. But for
+               // now we make the simplifying assumption that every
+               // controller request causes an upstream RailsAPI
+               // request.
+               maxReqs = maxRails
+       }
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
@@ -156,7 +169,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                                                interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
                                                        &httpserver.RequestLimiter{
                                                                Handler:                    handler,
-                                                               MaxConcurrent:              cluster.API.MaxConcurrentRequests,
+                                                               MaxConcurrent:              maxReqs,
                                                                MaxQueue:                   cluster.API.MaxQueuedRequests,
                                                                MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
                                                                Priority:                   c.requestPriority,
@@ -199,7 +212,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                <-handler.Done()
                srv.Close()
        }()
-       go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
+       go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger)
        err = srv.Wait()
        if err != nil {
                return 1
@@ -211,9 +224,9 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 // server's incoming HTTP request queue size. When it exceeds 90% of
 // API.MaxConcurrentRequests, write the /_inspect/requests data to a
 // JSON file in the specified directory.
-func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
        outdir := cluster.SystemLogs.RequestQueueDumpDirectory
-       if outdir == "" || cluster.ManagementToken == "" || cluster.API.MaxConcurrentRequests < 1 {
+       if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 {
                return
        }
        logger = logger.WithField("worker", "RequestQueueDump")
@@ -228,7 +241,7 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, r
                for _, mf := range mfs {
                        if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
                                n := int(mf.Metric[0].GetGauge().GetValue())
-                               if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 {
+                               if n > 0 && n >= maxReqs*9/10 {
                                        dump = true
                                        break
                                }
index 97a6bd8a4c979cd26b39c4c2cd93c75e82dbcd13..08b3a239dc2583c5da4271465cc550521dc54a79 100644 (file)
@@ -39,15 +39,19 @@ const (
        contextKey key = iota
 )
 
-func (*Suite) TestGetListenAddress(c *check.C) {
+func unusedPort(c *check.C) string {
        // Find an available port on the testing host, so the test
        // cases don't get confused by "already in use" errors.
        listener, err := net.Listen("tcp", ":")
        c.Assert(err, check.IsNil)
-       _, unusedPort, err := net.SplitHostPort(listener.Addr().String())
-       c.Assert(err, check.IsNil)
        listener.Close()
+       _, port, err := net.SplitHostPort(listener.Addr().String())
+       c.Assert(err, check.IsNil)
+       return port
+}
 
+func (*Suite) TestGetListenAddress(c *check.C) {
+       port := unusedPort(c)
        defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
        for idx, trial := range []struct {
                // internalURL => listenURL, both with trailing "/"
@@ -60,17 +64,17 @@ func (*Suite) TestGetListenAddress(c *check.C) {
                expectInternal   string
        }{
                {
-                       internalURLs:   map[string]string{"http://localhost:" + unusedPort + "/": ""},
-                       expectListen:   "http://localhost:" + unusedPort + "/",
-                       expectInternal: "http://localhost:" + unusedPort + "/",
+                       internalURLs:   map[string]string{"http://localhost:" + port + "/": ""},
+                       expectListen:   "http://localhost:" + port + "/",
+                       expectInternal: "http://localhost:" + port + "/",
                },
                { // implicit port 80 in InternalURLs
                        internalURLs:     map[string]string{"http://localhost/": ""},
                        expectErrorMatch: `.*:80: bind: permission denied`,
                },
                { // implicit port 443 in InternalURLs
-                       internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + unusedPort + "/"},
-                       expectListen:   "http://localhost:" + unusedPort + "/",
+                       internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + port + "/"},
+                       expectListen:   "http://localhost:" + port + "/",
                        expectInternal: "https://host.example/",
                },
                { // implicit port 443 in ListenURL
@@ -85,16 +89,16 @@ func (*Suite) TestGetListenAddress(c *check.C) {
                {
                        internalURLs: map[string]string{
                                "https://hostname1.example/": "http://localhost:12435/",
-                               "https://hostname2.example/": "http://localhost:" + unusedPort + "/",
+                               "https://hostname2.example/": "http://localhost:" + port + "/",
                        },
                        envVar:         "https://hostname2.example", // note this works despite missing trailing "/"
-                       expectListen:   "http://localhost:" + unusedPort + "/",
+                       expectListen:   "http://localhost:" + port + "/",
                        expectInternal: "https://hostname2.example/",
                },
                { // cannot listen on any of the ListenURLs
                        internalURLs: map[string]string{
-                               "https://hostname1.example/": "http://1.2.3.4:" + unusedPort + "/",
-                               "https://hostname2.example/": "http://1.2.3.4:" + unusedPort + "/",
+                               "https://hostname1.example/": "http://1.2.3.4:" + port + "/",
+                               "https://hostname2.example/": "http://1.2.3.4:" + port + "/",
                        },
                        expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
                },
@@ -194,10 +198,19 @@ func (*Suite) TestCommand(c *check.C) {
        c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
 }
 
-func (*Suite) TestDumpRequests(c *check.C) {
+func (s *Suite) TestDumpRequestsKeepweb(c *check.C) {
+       s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
+}
+
+func (s *Suite) TestDumpRequestsController(c *check.C) {
+       s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
+}
+
+func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
        defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
        requestQueueDumpCheckInterval = time.Second / 10
 
+       port := unusedPort(c)
        tmpdir := c.MkDir()
        cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
        c.Assert(err, check.IsNil)
@@ -211,14 +224,18 @@ Clusters:
   SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
   ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
   API:
-   MaxConcurrentRequests: %d
+   `+maxReqsConfigKey+`: %d
    MaxQueuedRequests: 0
   SystemLogs: {RequestQueueDumpDirectory: %q}
   Services:
    Controller:
-    ExternalURL: "http://localhost:12345"
-    InternalURLs: {"http://localhost:12345": {}}
+    ExternalURL: "http://localhost:`+port+`"
+    InternalURLs: {"http://localhost:`+port+`": {}}
+   WebDAV:
+    ExternalURL: "http://localhost:`+port+`"
+    InternalURLs: {"http://localhost:`+port+`": {}}
 `, max, tmpdir)
+       cf.Close()
 
        started := make(chan bool, max+1)
        hold := make(chan bool)
@@ -230,7 +247,7 @@ Clusters:
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
-       cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+       cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
                return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
        })
        cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
@@ -239,7 +256,7 @@ Clusters:
        var stdin, stdout, stderr bytes.Buffer
 
        go func() {
-               cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+               cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
                close(exited)
        }()
        select {
@@ -252,10 +269,10 @@ Clusters:
        deadline := time.Now().Add(time.Second * 2)
        for i := 0; i < max+1; i++ {
                go func() {
-                       resp, err := client.Get("http://localhost:12345/testpath")
+                       resp, err := client.Get("http://localhost:" + port + "/testpath")
                        for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
                                time.Sleep(time.Second / 100)
-                               resp, err = client.Get("http://localhost:12345/testpath")
+                               resp, err = client.Get("http://localhost:" + port + "/testpath")
                        }
                        if c.Check(err, check.IsNil) {
                                c.Logf("resp StatusCode %d", resp.StatusCode)
@@ -267,12 +284,12 @@ Clusters:
                case <-started:
                case <-time.After(time.Second):
                        c.Logf("%s", stderr.String())
-                       panic("timed out")
+                       c.Fatal("timed out")
                }
        }
        for delay := time.Second / 100; ; delay = delay * 2 {
                time.Sleep(delay)
-               j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
+               j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json")
                if os.IsNotExist(err) && deadline.After(time.Now()) {
                        continue
                }
@@ -298,7 +315,7 @@ Clusters:
        }
 
        for _, path := range []string{"/_inspect/requests", "/metrics"} {
-               req, err := http.NewRequest("GET", "http://localhost:12345"+path, nil)
+               req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil)
                c.Assert(err, check.IsNil)
                req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
                resp, err := client.Do(req)
@@ -320,10 +337,10 @@ Clusters:
        }
        close(hold)
        cancel()
-
 }
 
 func (*Suite) TestTLS(c *check.C) {
+       port := unusedPort(c)
        cwd, err := os.Getwd()
        c.Assert(err, check.IsNil)
 
@@ -333,8 +350,8 @@ Clusters:
   SystemRootToken: abcde
   Services:
    Controller:
-    ExternalURL: "https://localhost:12345"
-    InternalURLs: {"https://localhost:12345": {}}
+    ExternalURL: "https://localhost:` + port + `"
+    InternalURLs: {"https://localhost:` + port + `": {}}
   TLS:
    Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
    Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
@@ -359,7 +376,7 @@ Clusters:
                defer close(got)
                client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
                for range time.NewTicker(time.Millisecond).C {
-                       resp, err := client.Get("https://localhost:12345")
+                       resp, err := client.Get("https://localhost:" + port)
                        if err != nil {
                                c.Log(err)
                                continue
index a3e54952da483c46a87416b200291bc65b66304b..6e6c5298e46f0f5c61b5a978b312aaa49c70c62e 100644 (file)
@@ -99,6 +99,7 @@ type Cluster struct {
                DisabledAPIs                     StringSet
                MaxIndexDatabaseRead             int
                MaxItemsPerResponse              int
+               MaxConcurrentRailsRequests       int
                MaxConcurrentRequests            int
                MaxQueuedRequests                int
                MaxQueueTimeForLockRequests      Duration
index 84df363c2e645c594e25378a7de3fe187dcb2069..064a70a8ed47d70b38c13bd531bcc07db30c8fbf 100644 (file)
@@ -117,7 +117,8 @@ arvados:
 
     ### API
     API:
-      MaxConcurrentRequests: {{ max_workers * 2 }}
+      MaxConcurrentRailsRequests: {{ max_workers * 2 }}
+      MaxConcurrentRequests: {{ max_reqs }}
       MaxQueuedRequests: {{ max_reqs }}
 
     ### CONTAINERS
index 4c0aea25fe7ada8c6b9cf0f0853df8f56fdd2f52..de4c830906ff3418a22fe06175173424b816aec1 100644 (file)
@@ -29,7 +29,7 @@ nginx:
     # Make the passenger queue small (twice the concurrency, so
     # there's at most one pending request for each busy worker)
     # because controller reorders requests based on priority, and
-    # won't send more than API.MaxConcurrentRequests to passenger
+    # won't send more than API.MaxConcurrentRailsRequests to passenger
     # (which is max_workers * 2), so things that are moved to the head
     # of the line get processed quickly.
     passenger_max_request_queue_size: {{ max_workers * 2 + 1 }}