"io"
"net"
"net/http"
+ "net/http/httptest"
_ "net/http/pprof"
"net/url"
"os"
+ "regexp"
"strings"
+ "time"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
ctx context.Context // enables tests to shutdown service; no public API yet
}
+var requestQueueDumpCheckInterval = time.Minute
+
// Command returns a cmd.Handler that loads site config, calls
// newHandler with the current cluster and node configs, and brings up
// an http server with the returned handler.
loader := config.NewLoader(stdin, log)
loader.SetupFlags(flags)
- // prog is [keepstore, keep-web, git-httpd, ...] but the
+ // prog is [keepstore, keep-web, ...] but the
// legacy config flags are [-legacy-keepstore-config,
- // -legacy-keepweb-config, -legacy-git-httpd-config, ...]
+ // -legacy-keepweb-config, ...]
legacyFlag := "-legacy-" + strings.Replace(prog, "keep-", "keep", 1) + "-config"
args = loader.MungeLegacyConfigArgs(log, args, legacyFlag)
instrumented := httpserver.Instrument(reg, log,
httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
httpserver.AddRequestIDs(
- httpserver.LogRequests(
- interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
- httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+ httpserver.Inspect(reg, cluster.ManagementToken,
+ httpserver.LogRequests(
+ interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+ c.requestLimiter(handler, cluster, reg)))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
Addr: listenURL.Host,
}
if listenURL.Scheme == "https" || listenURL.Scheme == "wss" {
- tlsconfig, err := tlsConfigWithCertUpdater(cluster, logger)
+ tlsconfig, err := makeTLSConfig(cluster, logger)
if err != nil {
logger.WithError(err).Errorf("cannot start %s service on %s", c.svcName, listenURL.String())
return 1
<-handler.Done()
srv.Close()
}()
+ go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
err = srv.Wait()
if err != nil {
return 1
return 0
}
+// If SystemLogs.RequestQueueDumpDirectory is set, monitor the
+// server's incoming HTTP request limiters. When the number of
+// concurrent requests in any queue ("api" or "tunnel") exceeds 90% of
+// its maximum slots, write the /_inspect/requests data to a JSON file
+// in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+ outdir := cluster.SystemLogs.RequestQueueDumpDirectory
+ if outdir == "" || cluster.ManagementToken == "" {
+ return
+ }
+ logger = logger.WithField("worker", "RequestQueueDump")
+ outfile := outdir + "/" + prog + "-requests.json"
+ for range time.NewTicker(requestQueueDumpCheckInterval).C {
+ mfs, err := reg.Gather()
+ if err != nil {
+ logger.WithError(err).Warn("error getting metrics")
+ continue
+ }
+ cur := map[string]int{} // queue label => current
+ max := map[string]int{} // queue label => max
+ for _, mf := range mfs {
+ for _, m := range mf.GetMetric() {
+ for _, ml := range m.GetLabel() {
+ if ml.GetName() == "queue" {
+ n := int(m.GetGauge().GetValue())
+ if name := mf.GetName(); name == "arvados_concurrent_requests" {
+ cur[*ml.Value] = n
+ } else if name == "arvados_max_concurrent_requests" {
+ max[*ml.Value] = n
+ }
+ }
+ }
+ }
+ }
+ dump := false
+ for queue, n := range cur {
+ if n > 0 && max[queue] > 0 && n >= max[queue]*9/10 {
+ dump = true
+ break
+ }
+ }
+ if dump {
+ req, err := http.NewRequest("GET", "/_inspect/requests", nil)
+ if err != nil {
+ logger.WithError(err).Warn("error in http.NewRequest")
+ continue
+ }
+ req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+ resp := httptest.NewRecorder()
+ srv.Handler.ServeHTTP(resp, req)
+ if code := resp.Result().StatusCode; code != http.StatusOK {
+ logger.WithField("StatusCode", code).Warn("error getting /_inspect/requests")
+ continue
+ }
+ err = os.WriteFile(outfile, resp.Body.Bytes(), 0777)
+ if err != nil {
+ logger.WithError(err).Warn("error writing file")
+ continue
+ }
+ }
+ }
+}
+
+// Set up a httpserver.RequestLimiter with separate queues/streams for
+// API requests (obeying MaxConcurrentRequests etc) and gateway tunnel
+// requests (obeying MaxGatewayTunnels).
+func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
+ maxReqs := cluster.API.MaxConcurrentRequests
+ if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+ (maxRails < maxReqs || maxReqs == 0) &&
+ c.svcName == arvados.ServiceNameController {
+ // Ideally, we would accept up to
+ // MaxConcurrentRequests, and apply the
+ // MaxConcurrentRailsRequests limit only for requests
+ // that require calling upstream to RailsAPI. But for
+ // now we make the simplifying assumption that every
+ // controller request causes an upstream RailsAPI
+ // request.
+ maxReqs = maxRails
+ }
+ rqAPI := &httpserver.RequestQueue{
+ Label: "api",
+ MaxConcurrent: maxReqs,
+ MaxQueue: cluster.API.MaxQueuedRequests,
+ MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+ }
+ rqTunnel := &httpserver.RequestQueue{
+ Label: "tunnel",
+ MaxConcurrent: cluster.API.MaxGatewayTunnels,
+ MaxQueue: 0,
+ }
+ return &httpserver.RequestLimiter{
+ Handler: handler,
+ Priority: c.requestPriority,
+ Registry: reg,
+ Queue: func(req *http.Request) *httpserver.RequestQueue {
+ if req.Method == http.MethodPost && reTunnelPath.MatchString(req.URL.Path) {
+ return rqTunnel
+ } else {
+ return rqAPI
+ }
+ },
+ }
+}
+
+// reTunnelPath matches paths of API endpoints that go in the "tunnel"
+// queue.
+var reTunnelPath = regexp.MustCompile(func() string {
+ rePathVar := regexp.MustCompile(`{.*?}`)
+ out := ""
+ for _, endpoint := range []arvados.APIEndpoint{
+ arvados.EndpointContainerGatewayTunnel,
+ arvados.EndpointContainerGatewayTunnelCompat,
+ arvados.EndpointContainerSSH,
+ arvados.EndpointContainerSSHCompat,
+ } {
+ if out != "" {
+ out += "|"
+ }
+ out += `\Q/` + rePathVar.ReplaceAllString(endpoint.Path, `\E[^/]*\Q`) + `\E`
+ }
+ return "^(" + out + ")$"
+}())
+
+func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
+ switch {
+ case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
+ // Return 503 immediately instead of queueing. We want
+ // to send feedback to dispatchcloud ASAP to stop
+ // bringing up new containers.
+ return httpserver.MinPriority
+ case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
+ // "Create log entry" is the most harmless kind of
+ // request to drop. Negative priority is called "low"
+ // in aggregate metrics.
+ return -1
+ case req.Header.Get("Origin") != "":
+ // Handle interactive requests first. Positive
+ // priority is called "high" in aggregate metrics.
+ return 1
+ default:
+ // Zero priority is called "normal" in aggregate
+ // metrics.
+ return 0
+ }
+}
+
// If an incoming request's target vhost has an embedded collection
// UUID or PDH, handle it with hTrue, otherwise handle it with
// hFalse.
// intermediate proxy/routing)
listenURL = internalURL
}
- listener, err := net.Listen("tcp", listenURL.Host)
+ listenAddr := listenURL.Host
+ if _, _, err := net.SplitHostPort(listenAddr); err != nil {
+ // url "https://foo.example/" (with no
+ // explicit port name/number) means listen on
+ // the well-known port for the specified
+ // protocol, "foo.example:https".
+ port := listenURL.Scheme
+ if port == "ws" || port == "wss" {
+ port = "http" + port[2:]
+ }
+ listenAddr = net.JoinHostPort(listenAddr, port)
+ }
+ listener, err := net.Listen("tcp", listenAddr)
if err == nil {
listener.Close()
return listenURL, internalURL, nil