Merge branch '21678-installer-diagnostics-internal'. Closes #21678
[arvados.git] / lib / service / cmd.go
index 20441c2a6c4534eb697a85bfc4c369e64ae0aad9..9ed0acfb8f7151e805c7269dceacb696f425c07b 100644 (file)
@@ -12,10 +12,13 @@ import (
        "io"
        "net"
        "net/http"
+       "net/http/httptest"
        _ "net/http/pprof"
        "net/url"
        "os"
+       "regexp"
        "strings"
+       "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/config"
@@ -45,6 +48,8 @@ type command struct {
        ctx        context.Context // enables tests to shutdown service; no public API yet
 }
 
+var requestQueueDumpCheckInterval = time.Minute
+
 // Command returns a cmd.Handler that loads site config, calls
 // newHandler with the current cluster and node configs, and brings up
 // an http server with the returned handler.
@@ -75,9 +80,9 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        loader := config.NewLoader(stdin, log)
        loader.SetupFlags(flags)
 
-       // prog is [keepstore, keep-web, git-httpd, ...]  but the
+       // prog is [keepstore, keep-web, ...]  but the
        // legacy config flags are [-legacy-keepstore-config,
-       // -legacy-keepweb-config, -legacy-git-httpd-config, ...]
+       // -legacy-keepweb-config, ...]
        legacyFlag := "-legacy-" + strings.Replace(prog, "keep-", "keep", 1) + "-config"
        args = loader.MungeLegacyConfigArgs(log, args, legacyFlag)
 
@@ -150,7 +155,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                                httpserver.Inspect(reg, cluster.ManagementToken,
                                        httpserver.LogRequests(
                                                interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-                                                       httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
+                                                       c.requestLimiter(handler, cluster, reg)))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -189,6 +194,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                <-handler.Done()
                srv.Close()
        }()
+       go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
        err = srv.Wait()
        if err != nil {
                return 1
@@ -196,6 +202,153 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        return 0
 }
 
+// If SystemLogs.RequestQueueDumpDirectory is set, monitor the
+// server's incoming HTTP request limiters. When the number of
+// concurrent requests in any queue ("api" or "tunnel") exceeds 90% of
+// its maximum slots, write the /_inspect/requests data to a JSON file
+// in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+       outdir := cluster.SystemLogs.RequestQueueDumpDirectory
+       if outdir == "" || cluster.ManagementToken == "" {
+               return
+       }
+       logger = logger.WithField("worker", "RequestQueueDump")
+       outfile := outdir + "/" + prog + "-requests.json"
+       for range time.NewTicker(requestQueueDumpCheckInterval).C {
+               mfs, err := reg.Gather()
+               if err != nil {
+                       logger.WithError(err).Warn("error getting metrics")
+                       continue
+               }
+               cur := map[string]int{} // queue label => current
+               max := map[string]int{} // queue label => max
+               for _, mf := range mfs {
+                       for _, m := range mf.GetMetric() {
+                               for _, ml := range m.GetLabel() {
+                                       if ml.GetName() == "queue" {
+                                               n := int(m.GetGauge().GetValue())
+                                               if name := mf.GetName(); name == "arvados_concurrent_requests" {
+                                                       cur[*ml.Value] = n
+                                               } else if name == "arvados_max_concurrent_requests" {
+                                                       max[*ml.Value] = n
+                                               }
+                                       }
+                               }
+                       }
+               }
+               dump := false
+               for queue, n := range cur {
+                       if n > 0 && max[queue] > 0 && n >= max[queue]*9/10 {
+                               dump = true
+                               break
+                       }
+               }
+               if dump {
+                       req, err := http.NewRequest("GET", "/_inspect/requests", nil)
+                       if err != nil {
+                               logger.WithError(err).Warn("error in http.NewRequest")
+                               continue
+                       }
+                       req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+                       resp := httptest.NewRecorder()
+                       srv.Handler.ServeHTTP(resp, req)
+                       if code := resp.Result().StatusCode; code != http.StatusOK {
+                               logger.WithField("StatusCode", code).Warn("error getting /_inspect/requests")
+                               continue
+                       }
+                       err = os.WriteFile(outfile, resp.Body.Bytes(), 0777)
+                       if err != nil {
+                               logger.WithError(err).Warn("error writing file")
+                               continue
+                       }
+               }
+       }
+}
+
+// Set up a httpserver.RequestLimiter with separate queues/streams for
+// API requests (obeying MaxConcurrentRequests etc) and gateway tunnel
+// requests (obeying MaxGatewayTunnels).
+func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
+       maxReqs := cluster.API.MaxConcurrentRequests
+       if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+               (maxRails < maxReqs || maxReqs == 0) &&
+               c.svcName == arvados.ServiceNameController {
+               // Ideally, we would accept up to
+               // MaxConcurrentRequests, and apply the
+               // MaxConcurrentRailsRequests limit only for requests
+               // that require calling upstream to RailsAPI. But for
+               // now we make the simplifying assumption that every
+               // controller request causes an upstream RailsAPI
+               // request.
+               maxReqs = maxRails
+       }
+       rqAPI := &httpserver.RequestQueue{
+               Label:                      "api",
+               MaxConcurrent:              maxReqs,
+               MaxQueue:                   cluster.API.MaxQueuedRequests,
+               MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+       }
+       rqTunnel := &httpserver.RequestQueue{
+               Label:         "tunnel",
+               MaxConcurrent: cluster.API.MaxGatewayTunnels,
+               MaxQueue:      0,
+       }
+       return &httpserver.RequestLimiter{
+               Handler:  handler,
+               Priority: c.requestPriority,
+               Registry: reg,
+               Queue: func(req *http.Request) *httpserver.RequestQueue {
+                       if req.Method == http.MethodPost && reTunnelPath.MatchString(req.URL.Path) {
+                               return rqTunnel
+                       } else {
+                               return rqAPI
+                       }
+               },
+       }
+}
+
+// reTunnelPath matches paths of API endpoints that go in the "tunnel"
+// queue.
+var reTunnelPath = regexp.MustCompile(func() string {
+       rePathVar := regexp.MustCompile(`{.*?}`)
+       out := ""
+       for _, endpoint := range []arvados.APIEndpoint{
+               arvados.EndpointContainerGatewayTunnel,
+               arvados.EndpointContainerGatewayTunnelCompat,
+               arvados.EndpointContainerSSH,
+               arvados.EndpointContainerSSHCompat,
+       } {
+               if out != "" {
+                       out += "|"
+               }
+               out += `\Q/` + rePathVar.ReplaceAllString(endpoint.Path, `\E[^/]*\Q`) + `\E`
+       }
+       return "^(" + out + ")$"
+}())
+
+func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
+       switch {
+       case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
+               // Return 503 immediately instead of queueing. We want
+               // to send feedback to dispatchcloud ASAP to stop
+               // bringing up new containers.
+               return httpserver.MinPriority
+       case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
+               // "Create log entry" is the most harmless kind of
+               // request to drop. Negative priority is called "low"
+               // in aggregate metrics.
+               return -1
+       case req.Header.Get("Origin") != "":
+               // Handle interactive requests first. Positive
+               // priority is called "high" in aggregate metrics.
+               return 1
+       default:
+               // Zero priority is called "normal" in aggregate
+               // metrics.
+               return 0
+       }
+}
+
 // If an incoming request's target vhost has an embedded collection
 // UUID or PDH, handle it with hTrue, otherwise handle it with
 // hFalse.