From 9084d255611326869a1a603b3269d307329a4c59 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 27 Oct 2023 11:44:38 -0400 Subject: [PATCH] Merge branch '21124-max-rails-reqs' fixes #21124 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 12 +++- lib/config/export.go | 1 + lib/service/cmd.go | 23 ++++-- lib/service/cmd_test.go | 71 ++++++++++++------- sdk/go/arvados/config.go | 1 + .../multi_host/aws/pillars/arvados.sls | 3 +- .../aws/pillars/nginx_passenger.sls | 2 +- 7 files changed, 78 insertions(+), 35 deletions(-) diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index c10bf0e0f2..a633216be7 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -225,7 +225,17 @@ Clusters: # Maximum number of concurrent requests to process concurrently # in a single service process, or 0 for no limit. - MaxConcurrentRequests: 8 + # + # Note this applies to all Arvados services (controller, webdav, + # websockets, etc.). Concurrency in the controller service is + # also effectively limited by MaxConcurrentRailsRequests (see + # below) because most controller requests proxy through to the + # RailsAPI service. + MaxConcurrentRequests: 64 + + # Maximum number of concurrent requests to process concurrently + # in a single RailsAPI service process, or 0 for no limit. + MaxConcurrentRailsRequests: 8 # Maximum number of incoming requests to hold in a priority # queue waiting for one of the MaxConcurrentRequests slots to be diff --git a/lib/config/export.go b/lib/config/export.go index 3b577b023f..cbd9ff6d7f 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -68,6 +68,7 @@ var whitelist = map[string]bool{ "API.KeepServiceRequestTimeout": false, "API.LockBeforeUpdate": false, "API.LogCreateRequestFraction": false, + "API.MaxConcurrentRailsRequests": false, "API.MaxConcurrentRequests": false, "API.MaxIndexDatabaseRead": false, "API.MaxItemsPerResponse": true, diff --git a/lib/service/cmd.go b/lib/service/cmd.go index 854b94861f..725f86f3bd 100644 --- a/lib/service/cmd.go +++ b/lib/service/cmd.go @@ -148,6 +148,19 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout return 1 } + maxReqs := cluster.API.MaxConcurrentRequests + if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 && + (maxRails < maxReqs || maxReqs == 0) && + strings.HasSuffix(prog, "controller") { + // Ideally, we would accept up to + // MaxConcurrentRequests, and apply the + // MaxConcurrentRailsRequests limit only for requests + // that require calling upstream to RailsAPI. But for + // now we make the simplifying assumption that every + // controller request causes an upstream RailsAPI + // request. + maxReqs = maxRails + } instrumented := httpserver.Instrument(reg, log, httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(), httpserver.AddRequestIDs( @@ -156,7 +169,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth, &httpserver.RequestLimiter{ Handler: handler, - MaxConcurrent: cluster.API.MaxConcurrentRequests, + MaxConcurrent: maxReqs, MaxQueue: cluster.API.MaxQueuedRequests, MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(), Priority: c.requestPriority, @@ -199,7 +212,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout <-handler.Done() srv.Close() }() - go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger) + go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger) err = srv.Wait() if err != nil { return 1 @@ -211,9 +224,9 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout // server's incoming HTTP request queue size. When it exceeds 90% of // API.MaxConcurrentRequests, write the /_inspect/requests data to a // JSON file in the specified directory. -func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) { +func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) { outdir := cluster.SystemLogs.RequestQueueDumpDirectory - if outdir == "" || cluster.ManagementToken == "" || cluster.API.MaxConcurrentRequests < 1 { + if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 { return } logger = logger.WithField("worker", "RequestQueueDump") @@ -228,7 +241,7 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, r for _, mf := range mfs { if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 { n := int(mf.Metric[0].GetGauge().GetValue()) - if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 { + if n > 0 && n >= maxReqs*9/10 { dump = true break } diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go index 97a6bd8a4c..08b3a239dc 100644 --- a/lib/service/cmd_test.go +++ b/lib/service/cmd_test.go @@ -39,15 +39,19 @@ const ( contextKey key = iota ) -func (*Suite) TestGetListenAddress(c *check.C) { +func unusedPort(c *check.C) string { // Find an available port on the testing host, so the test // cases don't get confused by "already in use" errors. listener, err := net.Listen("tcp", ":") c.Assert(err, check.IsNil) - _, unusedPort, err := net.SplitHostPort(listener.Addr().String()) - c.Assert(err, check.IsNil) listener.Close() + _, port, err := net.SplitHostPort(listener.Addr().String()) + c.Assert(err, check.IsNil) + return port +} +func (*Suite) TestGetListenAddress(c *check.C) { + port := unusedPort(c) defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL") for idx, trial := range []struct { // internalURL => listenURL, both with trailing "/" @@ -60,17 +64,17 @@ func (*Suite) TestGetListenAddress(c *check.C) { expectInternal string }{ { - internalURLs: map[string]string{"http://localhost:" + unusedPort + "/": ""}, - expectListen: "http://localhost:" + unusedPort + "/", - expectInternal: "http://localhost:" + unusedPort + "/", + internalURLs: map[string]string{"http://localhost:" + port + "/": ""}, + expectListen: "http://localhost:" + port + "/", + expectInternal: "http://localhost:" + port + "/", }, { // implicit port 80 in InternalURLs internalURLs: map[string]string{"http://localhost/": ""}, expectErrorMatch: `.*:80: bind: permission denied`, }, { // implicit port 443 in InternalURLs - internalURLs: map[string]string{"https://host.example/": "http://localhost:" + unusedPort + "/"}, - expectListen: "http://localhost:" + unusedPort + "/", + internalURLs: map[string]string{"https://host.example/": "http://localhost:" + port + "/"}, + expectListen: "http://localhost:" + port + "/", expectInternal: "https://host.example/", }, { // implicit port 443 in ListenURL @@ -85,16 +89,16 @@ func (*Suite) TestGetListenAddress(c *check.C) { { internalURLs: map[string]string{ "https://hostname1.example/": "http://localhost:12435/", - "https://hostname2.example/": "http://localhost:" + unusedPort + "/", + "https://hostname2.example/": "http://localhost:" + port + "/", }, envVar: "https://hostname2.example", // note this works despite missing trailing "/" - expectListen: "http://localhost:" + unusedPort + "/", + expectListen: "http://localhost:" + port + "/", expectInternal: "https://hostname2.example/", }, { // cannot listen on any of the ListenURLs internalURLs: map[string]string{ - "https://hostname1.example/": "http://1.2.3.4:" + unusedPort + "/", - "https://hostname2.example/": "http://1.2.3.4:" + unusedPort + "/", + "https://hostname1.example/": "http://1.2.3.4:" + port + "/", + "https://hostname2.example/": "http://1.2.3.4:" + port + "/", }, expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host", }, @@ -194,10 +198,19 @@ func (*Suite) TestCommand(c *check.C) { c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`) } -func (*Suite) TestDumpRequests(c *check.C) { +func (s *Suite) TestDumpRequestsKeepweb(c *check.C) { + s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests") +} + +func (s *Suite) TestDumpRequestsController(c *check.C) { + s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests") +} + +func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) { defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval) requestQueueDumpCheckInterval = time.Second / 10 + port := unusedPort(c) tmpdir := c.MkDir() cf, err := ioutil.TempFile(tmpdir, "cmd_test.") c.Assert(err, check.IsNil) @@ -211,14 +224,18 @@ Clusters: SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb API: - MaxConcurrentRequests: %d + `+maxReqsConfigKey+`: %d MaxQueuedRequests: 0 SystemLogs: {RequestQueueDumpDirectory: %q} Services: Controller: - ExternalURL: "http://localhost:12345" - InternalURLs: {"http://localhost:12345": {}} + ExternalURL: "http://localhost:`+port+`" + InternalURLs: {"http://localhost:`+port+`": {}} + WebDAV: + ExternalURL: "http://localhost:`+port+`" + InternalURLs: {"http://localhost:`+port+`": {}} `, max, tmpdir) + cf.Close() started := make(chan bool, max+1) hold := make(chan bool) @@ -230,7 +247,7 @@ Clusters: ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler { + cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler { return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck} }) cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar") @@ -239,7 +256,7 @@ Clusters: var stdin, stdout, stderr bytes.Buffer go func() { - cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr) + cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr) close(exited) }() select { @@ -252,10 +269,10 @@ Clusters: deadline := time.Now().Add(time.Second * 2) for i := 0; i < max+1; i++ { go func() { - resp, err := client.Get("http://localhost:12345/testpath") + resp, err := client.Get("http://localhost:" + port + "/testpath") for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) { time.Sleep(time.Second / 100) - resp, err = client.Get("http://localhost:12345/testpath") + resp, err = client.Get("http://localhost:" + port + "/testpath") } if c.Check(err, check.IsNil) { c.Logf("resp StatusCode %d", resp.StatusCode) @@ -267,12 +284,12 @@ Clusters: case <-started: case <-time.After(time.Second): c.Logf("%s", stderr.String()) - panic("timed out") + c.Fatal("timed out") } } for delay := time.Second / 100; ; delay = delay * 2 { time.Sleep(delay) - j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json") + j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json") if os.IsNotExist(err) && deadline.After(time.Now()) { continue } @@ -298,7 +315,7 @@ Clusters: } for _, path := range []string{"/_inspect/requests", "/metrics"} { - req, err := http.NewRequest("GET", "http://localhost:12345"+path, nil) + req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil) c.Assert(err, check.IsNil) req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") resp, err := client.Do(req) @@ -320,10 +337,10 @@ Clusters: } close(hold) cancel() - } func (*Suite) TestTLS(c *check.C) { + port := unusedPort(c) cwd, err := os.Getwd() c.Assert(err, check.IsNil) @@ -333,8 +350,8 @@ Clusters: SystemRootToken: abcde Services: Controller: - ExternalURL: "https://localhost:12345" - InternalURLs: {"https://localhost:12345": {}} + ExternalURL: "https://localhost:` + port + `" + InternalURLs: {"https://localhost:` + port + `": {}} TLS: Key: file://` + cwd + `/../../services/api/tmp/self-signed.key Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem @@ -359,7 +376,7 @@ Clusters: defer close(got) client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}} for range time.NewTicker(time.Millisecond).C { - resp, err := client.Get("https://localhost:12345") + resp, err := client.Get("https://localhost:" + port) if err != nil { c.Log(err) continue diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 0afcf2be3c..64424fc3e9 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -99,6 +99,7 @@ type Cluster struct { DisabledAPIs StringSet MaxIndexDatabaseRead int MaxItemsPerResponse int + MaxConcurrentRailsRequests int MaxConcurrentRequests int MaxQueuedRequests int MaxQueueTimeForLockRequests Duration diff --git a/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls b/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls index 2944b4073c..dc98c43ace 100644 --- a/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls +++ b/tools/salt-install/config_examples/multi_host/aws/pillars/arvados.sls @@ -117,7 +117,8 @@ arvados: ### API API: - MaxConcurrentRequests: {{ max_workers * 2 }} + MaxConcurrentRailsRequests: {{ max_workers * 2 }} + MaxConcurrentRequests: {{ max_reqs }} MaxQueuedRequests: {{ max_reqs }} ### CONTAINERS diff --git a/tools/salt-install/config_examples/multi_host/aws/pillars/nginx_passenger.sls b/tools/salt-install/config_examples/multi_host/aws/pillars/nginx_passenger.sls index 4c0aea25fe..de4c830906 100644 --- a/tools/salt-install/config_examples/multi_host/aws/pillars/nginx_passenger.sls +++ b/tools/salt-install/config_examples/multi_host/aws/pillars/nginx_passenger.sls @@ -29,7 +29,7 @@ nginx: # Make the passenger queue small (twice the concurrency, so # there's at most one pending request for each busy worker) # because controller reorders requests based on priority, and - # won't send more than API.MaxConcurrentRequests to passenger + # won't send more than API.MaxConcurrentRailsRequests to passenger # (which is max_workers * 2), so things that are moved to the head # of the line get processed quickly. passenger_max_request_queue_size: {{ max_workers * 2 + 1 }} -- 2.30.2