return 1
}
- maxReqs := cluster.API.MaxConcurrentRequests
- if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
- (maxRails < maxReqs || maxReqs == 0) &&
- strings.HasSuffix(prog, "controller") {
- // Ideally, we would accept up to
- // MaxConcurrentRequests, and apply the
- // MaxConcurrentRailsRequests limit only for requests
- // that require calling upstream to RailsAPI. But for
- // now we make the simplifying assumption that every
- // controller request causes an upstream RailsAPI
- // request.
- maxReqs = maxRails
- }
instrumented := httpserver.Instrument(reg, log,
httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
httpserver.AddRequestIDs(
httpserver.Inspect(reg, cluster.ManagementToken,
httpserver.LogRequests(
interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
- &httpserver.RequestLimiter{
- Handler: handler,
- MaxConcurrent: maxReqs,
- MaxQueue: cluster.API.MaxQueuedRequests,
- MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
- Priority: c.requestPriority,
- Registry: reg}))))))
+ c.requestLimiter(handler, cluster, reg)))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
<-handler.Done()
srv.Close()
}()
- go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger)
+ go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
err = srv.Wait()
if err != nil {
return 1
}
// If SystemLogs.RequestQueueDumpDirectory is set, monitor the
-// server's incoming HTTP request queue size. When it exceeds 90% of
-// API.MaxConcurrentRequests, write the /_inspect/requests data to a
-// JSON file in the specified directory.
-func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+// server's incoming HTTP request limiters. When the number of
+// concurrent requests in any queue ("api" or "tunnel") exceeds 90% of
+// its maximum slots, write the /_inspect/requests data to a JSON file
+// in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
outdir := cluster.SystemLogs.RequestQueueDumpDirectory
- if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 {
+ if outdir == "" || cluster.ManagementToken == "" {
return
}
logger = logger.WithField("worker", "RequestQueueDump")
logger.WithError(err).Warn("error getting metrics")
continue
}
- dump := false
+ cur := map[string]int{} // queue label => current
+ max := map[string]int{} // queue label => max
for _, mf := range mfs {
- if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
- n := int(mf.Metric[0].GetGauge().GetValue())
- if n > 0 && n >= maxReqs*9/10 {
- dump = true
- break
+ for _, m := range mf.GetMetric() {
+ for _, ml := range m.GetLabel() {
+ if ml.GetName() == "queue" {
+ n := int(m.GetGauge().GetValue())
+ if name := mf.GetName(); name == "arvados_concurrent_requests" {
+ cur[*ml.Value] = n
+ } else if name == "arvados_max_concurrent_requests" {
+ max[*ml.Value] = n
+ }
+ }
}
}
}
+ dump := false
+ for queue, n := range cur {
+ if n > 0 && max[queue] > 0 && n >= max[queue]*9/10 {
+ dump = true
+ break
+ }
+ }
if dump {
req, err := http.NewRequest("GET", "/_inspect/requests", nil)
if err != nil {
}
}
+// Set up a httpserver.RequestLimiter with separate queues/streams for
+// API requests (obeying MaxConcurrentRequests etc) and gateway tunnel
+// requests (obeying MaxGatewayTunnels).
+func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
+ maxReqs := cluster.API.MaxConcurrentRequests
+ if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+ (maxRails < maxReqs || maxReqs == 0) &&
+ c.svcName == arvados.ServiceNameController {
+ // Ideally, we would accept up to
+ // MaxConcurrentRequests, and apply the
+ // MaxConcurrentRailsRequests limit only for requests
+ // that require calling upstream to RailsAPI. But for
+ // now we make the simplifying assumption that every
+ // controller request causes an upstream RailsAPI
+ // request.
+ maxReqs = maxRails
+ }
+ rqAPI := &httpserver.RequestQueue{
+ Label: "api",
+ MaxConcurrent: maxReqs,
+ MaxQueue: cluster.API.MaxQueuedRequests,
+ MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+ }
+ rqTunnel := &httpserver.RequestQueue{
+ Label: "tunnel",
+ MaxConcurrent: cluster.API.MaxGatewayTunnels,
+ MaxQueue: 0,
+ }
+ return &httpserver.RequestLimiter{
+ Handler: handler,
+ Priority: c.requestPriority,
+ Registry: reg,
+ Queue: func(req *http.Request) *httpserver.RequestQueue {
+ if strings.HasPrefix(req.URL.Path, "/arvados/v1/connect/") {
+ return rqTunnel
+ } else {
+ return rqAPI
+ }
+ },
+ }
+}
+
func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
switch {
case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):