Merge branch '21124-max-rails-reqs'
[arvados.git] / lib / service / cmd.go
index 1073f421dfa915e54dcb38ff9e3a02ecf7107413..725f86f3bda5c2a82476615ba9ecd6e7a9b7a4fa 100644 (file)
@@ -148,6 +148,19 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                return 1
        }
 
+       maxReqs := cluster.API.MaxConcurrentRequests
+       if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+               (maxRails < maxReqs || maxReqs == 0) &&
+               strings.HasSuffix(prog, "controller") {
+               // Ideally, we would accept up to
+               // MaxConcurrentRequests, and apply the
+               // MaxConcurrentRailsRequests limit only for requests
+               // that require calling upstream to RailsAPI. But for
+               // now we make the simplifying assumption that every
+               // controller request causes an upstream RailsAPI
+               // request.
+               maxReqs = maxRails
+       }
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
@@ -155,10 +168,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                                        httpserver.LogRequests(
                                                interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
                                                        &httpserver.RequestLimiter{
-                                                               Handler:       handler,
-                                                               MaxConcurrent: cluster.API.MaxConcurrentRequests,
-                                                               MaxQueue:      cluster.API.MaxQueuedRequests,
-                                                               Registry:      reg}))))))
+                                                               Handler:                    handler,
+                                                               MaxConcurrent:              maxReqs,
+                                                               MaxQueue:                   cluster.API.MaxQueuedRequests,
+                                                               MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+                                                               Priority:                   c.requestPriority,
+                                                               Registry:                   reg}))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -197,7 +212,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                <-handler.Done()
                srv.Close()
        }()
-       go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
+       go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger)
        err = srv.Wait()
        if err != nil {
                return 1
@@ -209,9 +224,9 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 // server's incoming HTTP request queue size. When it exceeds 90% of
 // API.MaxConcurrentRequests, write the /_inspect/requests data to a
 // JSON file in the specified directory.
-func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
        outdir := cluster.SystemLogs.RequestQueueDumpDirectory
-       if outdir == "" || cluster.ManagementToken == "" {
+       if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 {
                return
        }
        logger = logger.WithField("worker", "RequestQueueDump")
@@ -226,7 +241,7 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, r
                for _, mf := range mfs {
                        if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
                                n := int(mf.Metric[0].GetGauge().GetValue())
-                               if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 {
+                               if n > 0 && n >= maxReqs*9/10 {
                                        dump = true
                                        break
                                }
@@ -254,6 +269,29 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, r
        }
 }
 
+func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
+       switch {
+       case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
+               // Return 503 immediately instead of queueing. We want
+               // to send feedback to dispatchcloud ASAP to stop
+               // bringing up new containers.
+               return httpserver.MinPriority
+       case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
+               // "Create log entry" is the most harmless kind of
+               // request to drop. Negative priority is called "low"
+               // in aggregate metrics.
+               return -1
+       case req.Header.Get("Origin") != "":
+               // Handle interactive requests first. Positive
+               // priority is called "high" in aggregate metrics.
+               return 1
+       default:
+               // Zero priority is called "normal" in aggregate
+               // metrics.
+               return 0
+       }
+}
+
 // If an incoming request's target vhost has an embedded collection
 // UUID or PDH, handle it with hTrue, otherwise handle it with
 // hFalse.