21285: Use separate request limit/queue for gateway tunnel requests.
[arvados.git] / lib / service / cmd.go
index 725f86f3bda5c2a82476615ba9ecd6e7a9b7a4fa..e40b47acbbb6d4bb1b96a9de1bf2d625e28b2430 100644 (file)
@@ -148,32 +148,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                return 1
        }
 
-       maxReqs := cluster.API.MaxConcurrentRequests
-       if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
-               (maxRails < maxReqs || maxReqs == 0) &&
-               strings.HasSuffix(prog, "controller") {
-               // Ideally, we would accept up to
-               // MaxConcurrentRequests, and apply the
-               // MaxConcurrentRailsRequests limit only for requests
-               // that require calling upstream to RailsAPI. But for
-               // now we make the simplifying assumption that every
-               // controller request causes an upstream RailsAPI
-               // request.
-               maxReqs = maxRails
-       }
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
                                httpserver.Inspect(reg, cluster.ManagementToken,
                                        httpserver.LogRequests(
                                                interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-                                                       &httpserver.RequestLimiter{
-                                                               Handler:                    handler,
-                                                               MaxConcurrent:              maxReqs,
-                                                               MaxQueue:                   cluster.API.MaxQueuedRequests,
-                                                               MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
-                                                               Priority:                   c.requestPriority,
-                                                               Registry:                   reg}))))))
+                                                       c.requestLimiter(handler, cluster, reg)))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -212,7 +193,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                <-handler.Done()
                srv.Close()
        }()
-       go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger)
+       go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
        err = srv.Wait()
        if err != nil {
                return 1
@@ -221,12 +202,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 }
 
 // If SystemLogs.RequestQueueDumpDirectory is set, monitor the
-// server's incoming HTTP request queue size. When it exceeds 90% of
-// API.MaxConcurrentRequests, write the /_inspect/requests data to a
-// JSON file in the specified directory.
-func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+// server's incoming HTTP request limiters. When the number of
+// concurrent requests in any queue ("api" or "tunnel") exceeds 90% of
+// its maximum slots, write the /_inspect/requests data to a JSON file
+// in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
        outdir := cluster.SystemLogs.RequestQueueDumpDirectory
-       if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 {
+       if outdir == "" || cluster.ManagementToken == "" {
                return
        }
        logger = logger.WithField("worker", "RequestQueueDump")
@@ -237,16 +219,29 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p
                        logger.WithError(err).Warn("error getting metrics")
                        continue
                }
-               dump := false
+               cur := map[string]int{} // queue label => current
+               max := map[string]int{} // queue label => max
                for _, mf := range mfs {
-                       if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
-                               n := int(mf.Metric[0].GetGauge().GetValue())
-                               if n > 0 && n >= maxReqs*9/10 {
-                                       dump = true
-                                       break
+                       for _, m := range mf.GetMetric() {
+                               for _, ml := range m.GetLabel() {
+                                       if ml.GetName() == "queue" {
+                                               n := int(m.GetGauge().GetValue())
+                                               if name := mf.GetName(); name == "arvados_concurrent_requests" {
+                                                       cur[*ml.Value] = n
+                                               } else if name == "arvados_max_concurrent_requests" {
+                                                       max[*ml.Value] = n
+                                               }
+                                       }
                                }
                        }
                }
+               dump := false
+               for queue, n := range cur {
+                       if n > 0 && max[queue] > 0 && n >= max[queue]*9/10 {
+                               dump = true
+                               break
+                       }
+               }
                if dump {
                        req, err := http.NewRequest("GET", "/_inspect/requests", nil)
                        if err != nil {
@@ -269,6 +264,48 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p
        }
 }
 
+// Set up a httpserver.RequestLimiter with separate queues/streams for
+// API requests (obeying MaxConcurrentRequests etc) and gateway tunnel
+// requests (obeying MaxGatewayTunnels).
+func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
+       maxReqs := cluster.API.MaxConcurrentRequests
+       if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+               (maxRails < maxReqs || maxReqs == 0) &&
+               c.svcName == arvados.ServiceNameController {
+               // Ideally, we would accept up to
+               // MaxConcurrentRequests, and apply the
+               // MaxConcurrentRailsRequests limit only for requests
+               // that require calling upstream to RailsAPI. But for
+               // now we make the simplifying assumption that every
+               // controller request causes an upstream RailsAPI
+               // request.
+               maxReqs = maxRails
+       }
+       rqAPI := &httpserver.RequestQueue{
+               Label:                      "api",
+               MaxConcurrent:              maxReqs,
+               MaxQueue:                   cluster.API.MaxQueuedRequests,
+               MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+       }
+       rqTunnel := &httpserver.RequestQueue{
+               Label:         "tunnel",
+               MaxConcurrent: cluster.API.MaxGatewayTunnels,
+               MaxQueue:      0,
+       }
+       return &httpserver.RequestLimiter{
+               Handler:  handler,
+               Priority: c.requestPriority,
+               Registry: reg,
+               Queue: func(req *http.Request) *httpserver.RequestQueue {
+                       if strings.HasPrefix(req.URL.Path, "/arvados/v1/connect/") {
+                               return rqTunnel
+                       } else {
+                               return rqAPI
+                       }
+               },
+       }
+}
+
 func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
        switch {
        case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):