Merge branch '21124-max-rails-reqs'
[arvados.git] / lib / service / cmd.go
index c66c279432dc2f0ae6d5777f6776a1c2a3d063fb..725f86f3bda5c2a82476615ba9ecd6e7a9b7a4fa 100644 (file)
@@ -148,6 +148,19 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                return 1
        }
 
+       maxReqs := cluster.API.MaxConcurrentRequests
+       if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+               (maxRails < maxReqs || maxReqs == 0) &&
+               strings.HasSuffix(prog, "controller") {
+               // Ideally, we would accept up to
+               // MaxConcurrentRequests, and apply the
+               // MaxConcurrentRailsRequests limit only for requests
+               // that require calling upstream to RailsAPI. But for
+               // now we make the simplifying assumption that every
+               // controller request causes an upstream RailsAPI
+               // request.
+               maxReqs = maxRails
+       }
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
@@ -155,11 +168,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                                        httpserver.LogRequests(
                                                interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
                                                        &httpserver.RequestLimiter{
-                                                               Handler:       handler,
-                                                               MaxConcurrent: cluster.API.MaxConcurrentRequests,
-                                                               MaxQueue:      cluster.API.MaxQueuedRequests,
-                                                               Priority:      c.requestPriority,
-                                                               Registry:      reg}))))))
+                                                               Handler:                    handler,
+                                                               MaxConcurrent:              maxReqs,
+                                                               MaxQueue:                   cluster.API.MaxQueuedRequests,
+                                                               MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+                                                               Priority:                   c.requestPriority,
+                                                               Registry:                   reg}))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -198,7 +212,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                <-handler.Done()
                srv.Close()
        }()
-       go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
+       go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger)
        err = srv.Wait()
        if err != nil {
                return 1
@@ -210,9 +224,9 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 // server's incoming HTTP request queue size. When it exceeds 90% of
 // API.MaxConcurrentRequests, write the /_inspect/requests data to a
 // JSON file in the specified directory.
-func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
        outdir := cluster.SystemLogs.RequestQueueDumpDirectory
-       if outdir == "" || cluster.ManagementToken == "" || cluster.API.MaxConcurrentRequests < 1 {
+       if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 {
                return
        }
        logger = logger.WithField("worker", "RequestQueueDump")
@@ -227,7 +241,7 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, r
                for _, mf := range mfs {
                        if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
                                n := int(mf.Metric[0].GetGauge().GetValue())
-                               if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 {
+                               if n > 0 && n >= maxReqs*9/10 {
                                        dump = true
                                        break
                                }
@@ -261,16 +275,20 @@ func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
                // Return 503 immediately instead of queueing. We want
                // to send feedback to dispatchcloud ASAP to stop
                // bringing up new containers.
-               return httpserver.IneligibleForQueuePriority
+               return httpserver.MinPriority
        case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
                // "Create log entry" is the most harmless kind of
-               // request to drop.
-               return 0
+               // request to drop. Negative priority is called "low"
+               // in aggregate metrics.
+               return -1
        case req.Header.Get("Origin") != "":
-               // Handle interactive requests first.
-               return 2
-       default:
+               // Handle interactive requests first. Positive
+               // priority is called "high" in aggregate metrics.
                return 1
+       default:
+               // Zero priority is called "normal" in aggregate
+               // metrics.
+               return 0
        }
 }