return 1
}
+ maxReqs := cluster.API.MaxConcurrentRequests
+ if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+ (maxRails < maxReqs || maxReqs == 0) &&
+ strings.HasSuffix(prog, "controller") {
+ // Ideally, we would accept up to
+ // MaxConcurrentRequests, and apply the
+ // MaxConcurrentRailsRequests limit only for requests
+ // that require calling upstream to RailsAPI. But for
+ // now we make the simplifying assumption that every
+ // controller request causes an upstream RailsAPI
+ // request.
+ maxReqs = maxRails
+ }
instrumented := httpserver.Instrument(reg, log,
httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
httpserver.AddRequestIDs(
httpserver.LogRequests(
interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
&httpserver.RequestLimiter{
- Handler: handler,
- MaxConcurrent: cluster.API.MaxConcurrentRequests,
- MaxQueue: cluster.API.MaxQueuedRequests,
- Priority: c.requestPriority,
- Registry: reg}))))))
+ Handler: handler,
+ MaxConcurrent: maxReqs,
+ MaxQueue: cluster.API.MaxQueuedRequests,
+ MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+ Priority: c.requestPriority,
+ Registry: reg}))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
<-handler.Done()
srv.Close()
}()
- go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
+ go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger)
err = srv.Wait()
if err != nil {
return 1
// server's incoming HTTP request queue size. When it exceeds 90% of
// API.MaxConcurrentRequests, write the /_inspect/requests data to a
// JSON file in the specified directory.
-func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
outdir := cluster.SystemLogs.RequestQueueDumpDirectory
- if outdir == "" || cluster.ManagementToken == "" || cluster.API.MaxConcurrentRequests < 1 {
+ if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 {
return
}
logger = logger.WithField("worker", "RequestQueueDump")
for _, mf := range mfs {
if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
n := int(mf.Metric[0].GetGauge().GetValue())
- if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 {
+ if n > 0 && n >= maxReqs*9/10 {
dump = true
break
}
// Return 503 immediately instead of queueing. We want
// to send feedback to dispatchcloud ASAP to stop
// bringing up new containers.
- return httpserver.IneligibleForQueuePriority
+ return httpserver.MinPriority
case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
// "Create log entry" is the most harmless kind of
- // request to drop.
- return 0
+ // request to drop. Negative priority is called "low"
+ // in aggregate metrics.
+ return -1
case req.Header.Get("Origin") != "":
- // Handle interactive requests first.
- return 2
- default:
+ // Handle interactive requests first. Positive
+ // priority is called "high" in aggregate metrics.
return 1
+ default:
+ // Zero priority is called "normal" in aggregate
+ // metrics.
+ return 0
}
}