20475: Option to dump active requests when queue is >=90% full. 20475-dump-busy-queue
authorTom Clegg <tom@curii.com>
Wed, 3 May 2023 20:40:59 +0000 (16:40 -0400)
committerTom Clegg <tom@curii.com>
Wed, 3 May 2023 20:40:59 +0000 (16:40 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/service/cmd.go
lib/service/cmd_test.go
sdk/go/arvados/config.go

index cf9406b2bdab196d83d299b3d6028dd85dccfb59..8203a94de9dceae9c9d2af55358d81cc20090fb9 100644 (file)
@@ -442,6 +442,15 @@ Clusters:
       # params_truncated.
       MaxRequestLogParamsSize: 2000
 
+      # In all services except RailsAPI, periodically check whether
+      # the incoming HTTP request queue is nearly full (see
+      # MaxConcurrentRequests) and, if so, write a snapshot of the
+      # request queue to {service}-requests.json in the specified
+      # directory.
+      #
+      # Leave blank to disable.
+      RequestQueueDumpDirectory: ""
+
     Collections:
 
       # Enable access controls for data stored in Keep. This should
index 20441c2a6c4534eb697a85bfc4c369e64ae0aad9..cc6938cbc6e1ff3f680a4ca024fcc698e946d713 100644 (file)
@@ -12,10 +12,12 @@ import (
        "io"
        "net"
        "net/http"
+       "net/http/httptest"
        _ "net/http/pprof"
        "net/url"
        "os"
        "strings"
+       "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/config"
@@ -45,6 +47,8 @@ type command struct {
        ctx        context.Context // enables tests to shutdown service; no public API yet
 }
 
+var requestQueueDumpCheckInterval = time.Minute
+
 // Command returns a cmd.Handler that loads site config, calls
 // newHandler with the current cluster and node configs, and brings up
 // an http server with the returned handler.
@@ -189,6 +193,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                <-handler.Done()
                srv.Close()
        }()
+       go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
        err = srv.Wait()
        if err != nil {
                return 1
@@ -196,6 +201,55 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        return 0
 }
 
+// If SystemLogs.RequestQueueDumpDirectory is set, monitor the
+// server's incoming HTTP request queue size. When it exceeds 90% of
+// API.MaxConcurrentRequests, write the /_inspect/requests data to a
+// JSON file in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+       outdir := cluster.SystemLogs.RequestQueueDumpDirectory
+       if outdir == "" || cluster.ManagementToken == "" {
+               return
+       }
+       logger = logger.WithField("worker", "RequestQueueDump")
+       outfile := outdir + "/" + prog + "-requests.json"
+       for range time.NewTicker(requestQueueDumpCheckInterval).C {
+               mfs, err := reg.Gather()
+               if err != nil {
+                       logger.WithError(err).Warn("error getting metrics")
+                       continue
+               }
+               dump := false
+               for _, mf := range mfs {
+                       if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
+                               n := int(mf.Metric[0].GetGauge().GetValue())
+                               if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 {
+                                       dump = true
+                                       break
+                               }
+                       }
+               }
+               if dump {
+                       req, err := http.NewRequest("GET", "/_inspect/requests", nil)
+                       if err != nil {
+                               logger.WithError(err).Warn("error in http.NewRequest")
+                               continue
+                       }
+                       req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+                       resp := httptest.NewRecorder()
+                       srv.Handler.ServeHTTP(resp, req)
+                       if code := resp.Result().StatusCode; code != http.StatusOK {
+                               logger.WithField("StatusCode", code).Warn("error getting /_inspect/requests")
+                               continue
+                       }
+                       err = os.WriteFile(outfile, resp.Body.Bytes(), 0777)
+                       if err != nil {
+                               logger.WithError(err).Warn("error writing file")
+                               continue
+                       }
+               }
+       }
+}
+
 // If an incoming request's target vhost has an embedded collection
 // UUID or PDH, handle it with hTrue, otherwise handle it with
 // hFalse.
index 7db91092745e2e4886f0b1b35a3015da0f0387fc..22b8415137b76fafaa54319ec4c71f6df7e9a98c 100644 (file)
@@ -9,12 +9,14 @@ import (
        "bytes"
        "context"
        "crypto/tls"
+       "encoding/json"
        "fmt"
        "io/ioutil"
        "net"
        "net/http"
        "net/url"
        "os"
+       "strings"
        "testing"
        "time"
 
@@ -192,6 +194,102 @@ func (*Suite) TestCommand(c *check.C) {
        c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
 }
 
+func (*Suite) TestDumpRequests(c *check.C) {
+       defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
+       requestQueueDumpCheckInterval = time.Second / 10
+
+       tmpdir := c.MkDir()
+       cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(cf.Name())
+       defer cf.Close()
+
+       max := 1
+       fmt.Fprintf(cf, `
+Clusters:
+ zzzzz:
+  SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+  ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
+  API: {MaxConcurrentRequests: %d}
+  SystemLogs: {RequestQueueDumpDirectory: %q}
+  Services:
+   Controller:
+    ExternalURL: "http://localhost:12345"
+    InternalURLs: {"http://localhost:12345": {}}
+`, max, tmpdir)
+
+       started := make(chan bool, max+1)
+       hold := make(chan bool)
+       handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               started <- true
+               <-hold
+       })
+       healthCheck := make(chan bool, 1)
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+               return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
+       })
+       cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
+
+       exited := make(chan bool)
+       var stdin, stdout, stderr bytes.Buffer
+
+       go func() {
+               cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+               close(exited)
+       }()
+       select {
+       case <-healthCheck:
+       case <-exited:
+               c.Logf("%s", stderr.String())
+               c.Error("command exited without health check")
+       }
+       client := http.Client{}
+       deadline := time.Now().Add(time.Second * 2)
+       for i := 0; i < max+1; i++ {
+               go func() {
+                       resp, err := client.Get("http://localhost:12345/testpath")
+                       for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
+                               time.Sleep(time.Second / 100)
+                               resp, err = client.Get("http://localhost:12345/testpath")
+                       }
+                       if c.Check(err, check.IsNil) {
+                               c.Logf("resp StatusCode %d", resp.StatusCode)
+                       }
+               }()
+       }
+       for i := 0; i < max; i++ {
+               select {
+               case <-started:
+               case <-time.After(time.Second):
+                       c.Logf("%s", stderr.String())
+                       panic("timed out")
+               }
+       }
+       for {
+               j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
+               if os.IsNotExist(err) && deadline.After(time.Now()) {
+                       time.Sleep(time.Second / 100)
+                       continue
+               }
+               c.Check(err, check.IsNil)
+               c.Logf("%s", stderr.String())
+               c.Logf("%s", string(j))
+
+               var loaded []struct{ URL string }
+               err = json.Unmarshal(j, &loaded)
+               c.Check(err, check.IsNil)
+               c.Check(loaded, check.HasLen, max)
+               c.Check(loaded[0].URL, check.Equals, "/testpath")
+               break
+       }
+       close(hold)
+       cancel()
+
+}
+
 func (*Suite) TestTLS(c *check.C) {
        cwd, err := os.Getwd()
        c.Assert(err, check.IsNil)
index 01e9902c8663938d1b1683654bf84fa867d3f8ae..ee0e805134e2f5ff54f89083610a70cb6ffbff63 100644 (file)
@@ -221,9 +221,10 @@ type Cluster struct {
                EmailFrom                      string
        }
        SystemLogs struct {
-               LogLevel                string
-               Format                  string
-               MaxRequestLogParamsSize int
+               LogLevel                  string
+               Format                    string
+               MaxRequestLogParamsSize   int
+               RequestQueueDumpDirectory string
        }
        TLS struct {
                Certificate string