20602: Add IneligibleForQueuePriority const.
[arvados.git] / lib / service / cmd.go
index 92c554ce532386352b3ecf766b3dcd8b2b885fb8..c66c279432dc2f0ae6d5777f6776a1c2a3d063fb 100644 (file)
@@ -12,10 +12,12 @@ import (
        "io"
        "net"
        "net/http"
+       "net/http/httptest"
        _ "net/http/pprof"
        "net/url"
        "os"
        "strings"
+       "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/config"
@@ -45,6 +47,8 @@ type command struct {
        ctx        context.Context // enables tests to shutdown service; no public API yet
 }
 
+var requestQueueDumpCheckInterval = time.Minute
+
 // Command returns a cmd.Handler that loads site config, calls
 // newHandler with the current cluster and node configs, and brings up
 // an http server with the returned handler.
@@ -147,9 +151,15 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
-                               httpserver.LogRequests(
-                                       interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-                                               httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+                               httpserver.Inspect(reg, cluster.ManagementToken,
+                                       httpserver.LogRequests(
+                                               interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+                                                       &httpserver.RequestLimiter{
+                                                               Handler:       handler,
+                                                               MaxConcurrent: cluster.API.MaxConcurrentRequests,
+                                                               MaxQueue:      cluster.API.MaxQueuedRequests,
+                                                               Priority:      c.requestPriority,
+                                                               Registry:      reg}))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -158,7 +168,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                Addr: listenURL.Host,
        }
        if listenURL.Scheme == "https" || listenURL.Scheme == "wss" {
-               tlsconfig, err := tlsConfigWithCertUpdater(cluster, logger)
+               tlsconfig, err := makeTLSConfig(cluster, logger)
                if err != nil {
                        logger.WithError(err).Errorf("cannot start %s service on %s", c.svcName, listenURL.String())
                        return 1
@@ -188,6 +198,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                <-handler.Done()
                srv.Close()
        }()
+       go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
        err = srv.Wait()
        if err != nil {
                return 1
@@ -195,6 +206,74 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        return 0
 }
 
+// If SystemLogs.RequestQueueDumpDirectory is set, monitor the
+// server's incoming HTTP request queue size. When it exceeds 90% of
+// API.MaxConcurrentRequests, write the /_inspect/requests data to a
+// JSON file in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+       outdir := cluster.SystemLogs.RequestQueueDumpDirectory
+       if outdir == "" || cluster.ManagementToken == "" || cluster.API.MaxConcurrentRequests < 1 {
+               return
+       }
+       logger = logger.WithField("worker", "RequestQueueDump")
+       outfile := outdir + "/" + prog + "-requests.json"
+       for range time.NewTicker(requestQueueDumpCheckInterval).C {
+               mfs, err := reg.Gather()
+               if err != nil {
+                       logger.WithError(err).Warn("error getting metrics")
+                       continue
+               }
+               dump := false
+               for _, mf := range mfs {
+                       if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
+                               n := int(mf.Metric[0].GetGauge().GetValue())
+                               if n > 0 && n >= cluster.API.MaxConcurrentRequests*9/10 {
+                                       dump = true
+                                       break
+                               }
+                       }
+               }
+               if dump {
+                       req, err := http.NewRequest("GET", "/_inspect/requests", nil)
+                       if err != nil {
+                               logger.WithError(err).Warn("error in http.NewRequest")
+                               continue
+                       }
+                       req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+                       resp := httptest.NewRecorder()
+                       srv.Handler.ServeHTTP(resp, req)
+                       if code := resp.Result().StatusCode; code != http.StatusOK {
+                               logger.WithField("StatusCode", code).Warn("error getting /_inspect/requests")
+                               continue
+                       }
+                       err = os.WriteFile(outfile, resp.Body.Bytes(), 0777)
+                       if err != nil {
+                               logger.WithError(err).Warn("error writing file")
+                               continue
+                       }
+               }
+       }
+}
+
+func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
+       switch {
+       case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
+               // Return 503 immediately instead of queueing. We want
+               // to send feedback to dispatchcloud ASAP to stop
+               // bringing up new containers.
+               return httpserver.IneligibleForQueuePriority
+       case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
+               // "Create log entry" is the most harmless kind of
+               // request to drop.
+               return 0
+       case req.Header.Get("Origin") != "":
+               // Handle interactive requests first.
+               return 2
+       default:
+               return 1
+       }
+}
+
 // If an incoming request's target vhost has an embedded collection
 // UUID or PDH, handle it with hTrue, otherwise handle it with
 // hFalse.
@@ -223,64 +302,72 @@ func interceptHealthReqs(mgtToken string, checkHealth func() error, next http.Ha
        return ifCollectionInHost(next, mux)
 }
 
+// Determine listenURL (addr:port where server should bind) and
+// internalURL (target url that client should connect to) for a
+// service.
+//
+// If the config does not specify ListenURL, we check all of the
+// configured InternalURLs. If there is exactly one that matches our
+// hostname, or exactly one that matches a local interface address,
+// then we use that as listenURL.
+//
+// Note that listenURL and internalURL may use different protocols
+// (e.g., listenURL is http, but the service sits behind a proxy, so
+// clients connect using https).
 func getListenAddr(svcs arvados.Services, prog arvados.ServiceName, log logrus.FieldLogger) (arvados.URL, arvados.URL, error) {
        svc, ok := svcs.Map()[prog]
        if !ok {
                return arvados.URL{}, arvados.URL{}, fmt.Errorf("unknown service name %q", prog)
        }
 
-       if want := os.Getenv("ARVADOS_SERVICE_INTERNAL_URL"); want == "" {
-       } else if url, err := url.Parse(want); err != nil {
-               return arvados.URL{}, arvados.URL{}, fmt.Errorf("$ARVADOS_SERVICE_INTERNAL_URL (%q): %s", want, err)
-       } else {
+       if want := os.Getenv("ARVADOS_SERVICE_INTERNAL_URL"); want != "" {
+               url, err := url.Parse(want)
+               if err != nil {
+                       return arvados.URL{}, arvados.URL{}, fmt.Errorf("$ARVADOS_SERVICE_INTERNAL_URL (%q): %s", want, err)
+               }
                if url.Path == "" {
                        url.Path = "/"
                }
-               internalURL := arvados.URL(*url)
-               listenURL := arvados.URL(*url)
-               if svc.ListenAddress != "" {
-                       listenURL.Host = svc.ListenAddress
-               }
-               return listenURL, internalURL, nil
-       }
-
-       if svc.ListenAddress != "" {
-               scheme := ""
-               for internalURL := range svc.InternalURLs {
-                       if internalURL.Host == svc.ListenAddress {
-                               if len(svc.InternalURLs) > 1 {
-                                       log.Warnf("possible configuration error: multiple InternalURLs entries exist for %s but only %q will ever be used because it matches ListenAddress", prog, internalURL.String())
+               for internalURL, conf := range svc.InternalURLs {
+                       if internalURL.String() == url.String() {
+                               listenURL := conf.ListenURL
+                               if listenURL.Host == "" {
+                                       listenURL = internalURL
                                }
-                               return internalURL, internalURL, nil
-                       }
-                       switch scheme {
-                       case "":
-                               scheme = internalURL.Scheme
-                       case internalURL.Scheme:
-                       default:
-                               scheme = "-" // different InternalURLs have different schemes
+                               return listenURL, internalURL, nil
                        }
                }
-               if scheme == "-" {
-                       return arvados.URL{}, arvados.URL{}, fmt.Errorf("cannot use ListenAddress %q: InternalURLs use multiple schemes and none have host %q", svc.ListenAddress, svc.ListenAddress)
-               }
-               if scheme == "" {
-                       // No entries at all in InternalURLs
-                       scheme = "http"
-               }
-               listenURL := arvados.URL{}
-               listenURL.Host = svc.ListenAddress
-               listenURL.Scheme = scheme
-               listenURL.Path = "/"
-               return listenURL, listenURL, nil
+               log.Warnf("possible configuration error: listening on %s (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry", url)
+               internalURL := arvados.URL(*url)
+               return internalURL, internalURL, nil
        }
 
        errors := []string{}
-       for internalURL := range svc.InternalURLs {
-               listener, err := net.Listen("tcp", internalURL.Host)
+       for internalURL, conf := range svc.InternalURLs {
+               listenURL := conf.ListenURL
+               if listenURL.Host == "" {
+                       // If ListenURL is not specified, assume
+                       // InternalURL is also usable as the listening
+                       // proto/addr/port (i.e., simple case with no
+                       // intermediate proxy/routing)
+                       listenURL = internalURL
+               }
+               listenAddr := listenURL.Host
+               if _, _, err := net.SplitHostPort(listenAddr); err != nil {
+                       // url "https://foo.example/" (with no
+                       // explicit port name/number) means listen on
+                       // the well-known port for the specified
+                       // protocol, "foo.example:https".
+                       port := listenURL.Scheme
+                       if port == "ws" || port == "wss" {
+                               port = "http" + port[2:]
+                       }
+                       listenAddr = net.JoinHostPort(listenAddr, port)
+               }
+               listener, err := net.Listen("tcp", listenAddr)
                if err == nil {
                        listener.Close()
-                       return internalURL, internalURL, nil
+                       return listenURL, internalURL, nil
                } else if strings.Contains(err.Error(), "cannot assign requested address") {
                        // If 'Host' specifies a different server than
                        // the current one, it'll resolve the hostname
@@ -288,7 +375,7 @@ func getListenAddr(svcs arvados.Services, prog arvados.ServiceName, log logrus.F
                        // can't bind an IP address it doesn't own.
                        continue
                } else {
-                       errors = append(errors, fmt.Sprintf("tried %v, got %v", internalURL, err))
+                       errors = append(errors, fmt.Sprintf("%s: %s", listenURL, err))
                }
        }
        if len(errors) > 0 {