Merge branch '18870-installer' refs #18870
authorPeter Amstutz <peter.amstutz@curii.com>
Thu, 30 Jun 2022 18:06:48 +0000 (14:06 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Thu, 30 Jun 2022 18:06:48 +0000 (14:06 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

12 files changed:
doc/admin/config-urls.html.textile.liquid
lib/config/cmd_test.go
lib/config/config.default.yml
lib/controller/handler.go
lib/controller/handler_test.go
lib/service/cmd.go
lib/service/cmd_test.go
sdk/go/arvados/config.go
sdk/go/httpserver/inspect.go [new file with mode: 0644]
sdk/go/httpserver/inspect_test.go [new file with mode: 0644]
sdk/go/httpserver/logger.go
sdk/go/httpserver/request_limiter_test.go

index e518ea1bf7c11c8791e0aff4c6c220a34c39735f..01c30f0e0eb88eecabc0269cabd40c3aabb07892 100644 (file)
@@ -16,9 +16,9 @@ The @Services@ section lists a number of Arvados services, each with an @Interna
 
 The @ExternalURL@ is the address where the service should be reachable by clients, both from inside and from outside the Arvados cluster. Some services do not expose an Arvados API, only Prometheus metrics. In that case, @ExternalURL@ is not used.
 
-The keys under @InternalURLs@ are addresses that are used by the reverse proxy (e.g. Nginx) that fronts Arvados services. The exception is the @Keepstore@ service, where clients connect directly to the addresses listed under @InternalURLs@. If a service is not fronted by a reverse proxy, e.g. when its endpoint only exposes Prometheus metrics, the intention is that metrics are collected directly from the endpoints defined in @InternalURLs@.
+The keys under @InternalURLs@ are the URLs through which Arvados system components can connect to one another, including the reverse proxy (e.g. Nginx) that fronts Arvados services. The exception is the @Keepstore@ service, where clients on the local network connect directly to @Keepstore.InternalURLs@ (while clients from outside networks connect to @Keepproxy.ExternalURL@). If a service is not fronted by a reverse proxy, e.g. when its endpoint only exposes Prometheus metrics, the intention is that metrics are collected directly from the endpoints defined in @InternalURLs@.
 
-@InternalURLs@ are also used by the service itself to figure out which address/port to listen on.
+Each entry in the @InternalURLs@ section may also indicate a @ListenURL@ to determine the protocol, address/interface, and port where the service process will listen, in case the desired listening address differs from the @InternalURLs@ key itself -- for example, when passing internal traffic through a reverse proxy.
 
 If the Arvados service lives behind a reverse proxy (e.g. Nginx), configuring the reverse proxy and the @InternalURLs@ and @ExternalURL@ values must be done in concert.
 
@@ -228,11 +228,12 @@ Consider this section for the @Controller@ service:
 {% codeblock as yaml %}
   Controller:
     InternalURLs:
-      "http://localhost:8003": {}
+      "https://ctrl-0.internal":
+        ListenURL: "http://localhost:8003"
     ExternalURL: "https://ClusterID.example.com"
 {% endcodeblock %}
 
-The @ExternalURL@ advertised is @https://ClusterID.example.com@. The @Controller@ service will start up on @localhost@ port 8003. Nginx is configured to sit in front of the @Controller@ service and terminates SSL:
+The @ExternalURL@ advertised to clients is @https://ClusterID.example.com@. The @arvados-controller@ process will listen on @localhost@ port 8003. Other Arvados service processes in the cluster can connect to this specific controller instance, using the URL @https://ctrl-0.internal@. Nginx is configured to sit in front of the @Controller@ service and terminate TLS:
 
 <notextile><pre><code>
 # This is the port where nginx expects to contact arvados-controller.
@@ -245,7 +246,7 @@ server {
   # the request is reverse proxied to the upstream 'controller'
 
   listen       443 ssl;
-  server_name  ClusterID.example.com;
+  server_name  ClusterID.example.com ctrl-0.internal;
 
   ssl_certificate     /YOUR/PATH/TO/cert.pem;
   ssl_certificate_key /YOUR/PATH/TO/cert.key;
@@ -275,4 +276,13 @@ server {
 }
 </code></pre></notextile>
 
+If the host part of @ListenURL@ is ambiguous, in the sense that more than one system host is able to listen on that address (e.g., @localhost@), configure each host's startup scripts to set the environment variable @ARVADOS_SERVICE_INTERNAL_URL@ to the @InternalURLs@ key that will reach that host. In the example above, this would be @ARVADOS_SERVICE_INTERNAL_URL=https://ctrl-0.internal@.
+
+If the cluster has just a single node running all of the Arvados server processes, configuration can be simplified:
 
+{% codeblock as yaml %}
+  Controller:
+    InternalURLs:
+      "http://localhost:8003": {}
+    ExternalURL: "https://ClusterID.example.com"
+{% endcodeblock %}
index 7167982ccd7021f3b43a895190909694c493b7da..9503a54d2d7c137ec5c2e805a3aaec9b990014ce 100644 (file)
@@ -217,7 +217,7 @@ Clusters:
        code := DumpCommand.RunCommand("arvados config-dump", []string{"-config", "-"}, bytes.NewBufferString(in), &stdout, &stderr)
        c.Check(code, check.Equals, 0)
        c.Check(stdout.String(), check.Matches, `(?ms).*TimeoutBooting: 10m\n.*`)
-       c.Check(stdout.String(), check.Matches, `(?ms).*http://localhost:12345/: {}\n.*`)
+       c.Check(stdout.String(), check.Matches, `(?ms).*http://localhost:12345/:\n +ListenURL: ""\n.*`)
 }
 
 func (s *CommandSuite) TestDump_UnknownKey(c *check.C) {
index a9bbf4eee9b5002e733cb46df5ffe9be995ffdcf..472a22c6b2cb11a3566d882e6420f52400ca4b13 100644 (file)
@@ -22,47 +22,78 @@ Clusters:
 
     Services:
 
-      # In each of the service sections below, the keys under
-      # InternalURLs are the endpoints where the service should be
-      # listening, and reachable from other hosts in the
-      # cluster. Example:
+      # Each of the service sections below specifies InternalURLs
+      # (each with optional ListenURL) and ExternalURL.
+      #
+      # InternalURLs specify how other Arvados service processes will
+      # connect to the service. Typically these use internal hostnames
+      # and high port numbers. Example:
+      #
+      # InternalURLs:
+      #   "http://host1.internal.example:12345": {}
+      #   "http://host2.internal.example:12345": {}
+      #
+      # ListenURL specifies the address and port the service process's
+      # HTTP server should listen on, if different from the
+      # InternalURL itself. Example, using an intermediate TLS proxy:
       #
       # InternalURLs:
-      #   "http://host1.example:12345": {}
-      #   "http://host2.example:12345": {}
+      #   "https://host1.internal.example":
+      #     ListenURL: "http://10.0.0.7:12345"
+      #
+      # When there are multiple InternalURLs configured, the service
+      # process will try listening on each InternalURLs (using
+      # ListenURL if provided) until one works. If you use a ListenURL
+      # like "0.0.0.0" which can be bound on any machine, use an
+      # environment variable
+      # ARVADOS_SERVICE_INTERNAL_URL=http://host1.internal.example to
+      # control which entry to use.
+      #
+      # ExternalURL specifies how applications/clients will connect to
+      # the service, regardless of whether they are inside or outside
+      # the cluster. Example:
+      #
+      # ExternalURL: "https://keep.zzzzz.example.com/"
+      #
+      # To avoid routing internal traffic through external networks,
+      # use split-horizon DNS for ExternalURL host names: inside the
+      # cluster's private network "host.zzzzz.example.com" resolves to
+      # the host's private IP address, while outside the cluster
+      # "host.zzzzz.example.com" resolves to the host's public IP
+      # address (or its external gateway or load balancer).
 
       RailsAPI:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       Controller:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       Websocket:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       Keepbalance:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       GitHTTP:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       GitSSH:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       DispatchCloud:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       DispatchLSF:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       DispatchSLURM:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       Keepproxy:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       WebDAV:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         # Base URL for Workbench inline preview.  If blank, use
         # WebDAVDownload instead, and disable inline preview.
         # If both are empty, downloading collections from workbench
@@ -101,7 +132,7 @@ Clusters:
         ExternalURL: ""
 
       WebDAVDownload:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         # Base URL for download links. If blank, serve links to WebDAV
         # with disposition=attachment query param.  Unlike preview links,
         # browsers do not render attachments, so there is no risk of XSS.
@@ -117,6 +148,7 @@ Clusters:
       Keepstore:
         InternalURLs:
           SAMPLE:
+            ListenURL: ""
             # Rendezvous is normally empty/omitted. When changing the
             # URL of a Keepstore service, Rendezvous should be set to
             # the old URL (with trailing slash omitted) to preserve
@@ -124,10 +156,10 @@ Clusters:
             Rendezvous: ""
         ExternalURL: ""
       Composer:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       WebShell:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         # ShellInABox service endpoint URL for a given VM.  If empty, do not
         # offer web shell logins.
         #
@@ -138,13 +170,13 @@ Clusters:
         # https://*.webshell.uuid_prefix.arvadosapi.com
         ExternalURL: ""
       Workbench1:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       Workbench2:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
       Health:
-        InternalURLs: {SAMPLE: {}}
+        InternalURLs: {SAMPLE: {ListenURL: ""}}
         ExternalURL: ""
 
     PostgreSQL:
index f5840b34ce72cd18da4d75c4d27dbb23920e53dd..665fd5c636372fc4a21bd7de68c5d886aafbcc7c 100644 (file)
@@ -13,7 +13,6 @@ import (
        "net/url"
        "strings"
        "sync"
-       "time"
 
        "git.arvados.org/arvados.git/lib/controller/api"
        "git.arvados.org/arvados.git/lib/controller/federation"
@@ -61,12 +60,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
                        req.URL.Path = strings.Replace(req.URL.Path, "//", "/", -1)
                }
        }
-       if h.Cluster.API.RequestTimeout > 0 {
-               ctx, cancel := context.WithDeadline(req.Context(), time.Now().Add(time.Duration(h.Cluster.API.RequestTimeout)))
-               req = req.WithContext(ctx)
-               defer cancel()
-       }
-
        h.handlerStack.ServeHTTP(w, req)
 }
 
index 5e467cb0588607d3deaa06c1d92326ed18f8f09c..39c2b1c68e5c82921e10bc9125d54e17846a8fed 100644 (file)
@@ -204,17 +204,21 @@ func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
        c.Check(len(dd.Schemas), check.Not(check.Equals), 0)
 }
 
-func (s *HandlerSuite) TestRequestTimeout(c *check.C) {
-       s.cluster.API.RequestTimeout = arvados.Duration(time.Nanosecond)
-       req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
+// Handler should give up and exit early if request context is
+// cancelled due to client hangup, httpserver.HandlerWithDeadline,
+// etc.
+func (s *HandlerSuite) TestRequestCancel(c *check.C) {
+       ctx, cancel := context.WithCancel(context.Background())
+       req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil).WithContext(ctx)
        resp := httptest.NewRecorder()
+       cancel()
        s.handler.ServeHTTP(resp, req)
        c.Check(resp.Code, check.Equals, http.StatusBadGateway)
        var jresp httpserver.ErrorResponse
        err := json.Unmarshal(resp.Body.Bytes(), &jresp)
        c.Check(err, check.IsNil)
        c.Assert(len(jresp.Errors), check.Equals, 1)
-       c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded.*`)
+       c.Check(jresp.Errors[0], check.Matches, `.*context canceled`)
 }
 
 func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) {
index 679cbede13bc8cf141a34ec40373a458c5195451..4b640c4e4773225ccb0e9312bc18a436552e9cfb 100644 (file)
@@ -121,11 +121,11 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        })
        ctx := ctxlog.Context(c.ctx, logger)
 
-       listenURL, err := getListenAddr(cluster.Services, c.svcName, log)
+       listenURL, internalURL, err := getListenAddr(cluster.Services, c.svcName, log)
        if err != nil {
                return 1
        }
-       ctx = context.WithValue(ctx, contextKeyURL{}, listenURL)
+       ctx = context.WithValue(ctx, contextKeyURL{}, internalURL)
 
        reg := prometheus.NewRegistry()
        loader.RegisterMetrics(reg)
@@ -147,9 +147,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
-                               httpserver.LogRequests(
-                                       interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-                                               httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+                               httpserver.Inspect(reg, cluster.ManagementToken,
+                                       httpserver.LogRequests(
+                                               interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+                                                       httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -157,7 +158,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                },
                Addr: listenURL.Host,
        }
-       if listenURL.Scheme == "https" {
+       if listenURL.Scheme == "https" || listenURL.Scheme == "wss" {
                tlsconfig, err := tlsConfigWithCertUpdater(cluster, logger)
                if err != nil {
                        logger.WithError(err).Errorf("cannot start %s service on %s", c.svcName, listenURL.String())
@@ -223,28 +224,72 @@ func interceptHealthReqs(mgtToken string, checkHealth func() error, next http.Ha
        return ifCollectionInHost(next, mux)
 }
 
-func getListenAddr(svcs arvados.Services, prog arvados.ServiceName, log logrus.FieldLogger) (arvados.URL, error) {
+// Determine listenURL (addr:port where server should bind) and
+// internalURL (target url that client should connect to) for a
+// service.
+//
+// If the config does not specify ListenURL, we check all of the
+// configured InternalURLs. If there is exactly one that matches our
+// hostname, or exactly one that matches a local interface address,
+// then we use that as listenURL.
+//
+// Note that listenURL and internalURL may use different protocols
+// (e.g., listenURL is http, but the service sits behind a proxy, so
+// clients connect using https).
+func getListenAddr(svcs arvados.Services, prog arvados.ServiceName, log logrus.FieldLogger) (arvados.URL, arvados.URL, error) {
        svc, ok := svcs.Map()[prog]
        if !ok {
-               return arvados.URL{}, fmt.Errorf("unknown service name %q", prog)
+               return arvados.URL{}, arvados.URL{}, fmt.Errorf("unknown service name %q", prog)
        }
 
-       if want := os.Getenv("ARVADOS_SERVICE_INTERNAL_URL"); want == "" {
-       } else if url, err := url.Parse(want); err != nil {
-               return arvados.URL{}, fmt.Errorf("$ARVADOS_SERVICE_INTERNAL_URL (%q): %s", want, err)
-       } else {
+       if want := os.Getenv("ARVADOS_SERVICE_INTERNAL_URL"); want != "" {
+               url, err := url.Parse(want)
+               if err != nil {
+                       return arvados.URL{}, arvados.URL{}, fmt.Errorf("$ARVADOS_SERVICE_INTERNAL_URL (%q): %s", want, err)
+               }
                if url.Path == "" {
                        url.Path = "/"
                }
-               return arvados.URL(*url), nil
+               for internalURL, conf := range svc.InternalURLs {
+                       if internalURL.String() == url.String() {
+                               listenURL := conf.ListenURL
+                               if listenURL.Host == "" {
+                                       listenURL = internalURL
+                               }
+                               return listenURL, internalURL, nil
+                       }
+               }
+               log.Warnf("possible configuration error: listening on %s (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry", url)
+               internalURL := arvados.URL(*url)
+               return internalURL, internalURL, nil
        }
 
        errors := []string{}
-       for url := range svc.InternalURLs {
-               listener, err := net.Listen("tcp", url.Host)
+       for internalURL, conf := range svc.InternalURLs {
+               listenURL := conf.ListenURL
+               if listenURL.Host == "" {
+                       // If ListenURL is not specified, assume
+                       // InternalURL is also usable as the listening
+                       // proto/addr/port (i.e., simple case with no
+                       // intermediate proxy/routing)
+                       listenURL = internalURL
+               }
+               listenAddr := listenURL.Host
+               if _, _, err := net.SplitHostPort(listenAddr); err != nil {
+                       // url "https://foo.example/" (with no
+                       // explicit port name/number) means listen on
+                       // the well-known port for the specified
+                       // protocol, "foo.example:https".
+                       port := listenURL.Scheme
+                       if port == "ws" || port == "wss" {
+                               port = "http" + port[2:]
+                       }
+                       listenAddr = net.JoinHostPort(listenAddr, port)
+               }
+               listener, err := net.Listen("tcp", listenAddr)
                if err == nil {
                        listener.Close()
-                       return url, nil
+                       return listenURL, internalURL, nil
                } else if strings.Contains(err.Error(), "cannot assign requested address") {
                        // If 'Host' specifies a different server than
                        // the current one, it'll resolve the hostname
@@ -252,13 +297,13 @@ func getListenAddr(svcs arvados.Services, prog arvados.ServiceName, log logrus.F
                        // can't bind an IP address it doesn't own.
                        continue
                } else {
-                       errors = append(errors, fmt.Sprintf("tried %v, got %v", url, err))
+                       errors = append(errors, fmt.Sprintf("%s: %s", listenURL, err))
                }
        }
        if len(errors) > 0 {
-               return arvados.URL{}, fmt.Errorf("could not enable the %q service on this host: %s", prog, strings.Join(errors, "; "))
+               return arvados.URL{}, arvados.URL{}, fmt.Errorf("could not enable the %q service on this host: %s", prog, strings.Join(errors, "; "))
        }
-       return arvados.URL{}, fmt.Errorf("configuration does not enable the %q service on this host", prog)
+       return arvados.URL{}, arvados.URL{}, fmt.Errorf("configuration does not enable the %q service on this host", prog)
 }
 
 type contextKeyURL struct{}
index 10591d9b55cf44beb41e7a898a296f20a0aab851..7db91092745e2e4886f0b1b35a3015da0f0387fc 100644 (file)
@@ -11,7 +11,9 @@ import (
        "crypto/tls"
        "fmt"
        "io/ioutil"
+       "net"
        "net/http"
+       "net/url"
        "os"
        "testing"
        "time"
@@ -35,6 +37,126 @@ const (
        contextKey key = iota
 )
 
+func (*Suite) TestGetListenAddress(c *check.C) {
+       // Find an available port on the testing host, so the test
+       // cases don't get confused by "already in use" errors.
+       listener, err := net.Listen("tcp", ":")
+       c.Assert(err, check.IsNil)
+       _, unusedPort, err := net.SplitHostPort(listener.Addr().String())
+       c.Assert(err, check.IsNil)
+       listener.Close()
+
+       defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
+       for idx, trial := range []struct {
+               // internalURL => listenURL, both with trailing "/"
+               // because config loader always adds it
+               internalURLs     map[string]string
+               envVar           string
+               expectErrorMatch string
+               expectLogsMatch  string
+               expectListen     string
+               expectInternal   string
+       }{
+               {
+                       internalURLs:   map[string]string{"http://localhost:" + unusedPort + "/": ""},
+                       expectListen:   "http://localhost:" + unusedPort + "/",
+                       expectInternal: "http://localhost:" + unusedPort + "/",
+               },
+               { // implicit port 80 in InternalURLs
+                       internalURLs:     map[string]string{"http://localhost/": ""},
+                       expectErrorMatch: `.*:80: bind: permission denied`,
+               },
+               { // implicit port 443 in InternalURLs
+                       internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + unusedPort + "/"},
+                       expectListen:   "http://localhost:" + unusedPort + "/",
+                       expectInternal: "https://host.example/",
+               },
+               { // implicit port 443 in ListenURL
+                       internalURLs:     map[string]string{"wss://host.example/": "wss://localhost/"},
+                       expectErrorMatch: `.*:443: bind: permission denied`,
+               },
+               {
+                       internalURLs:   map[string]string{"https://hostname.example/": "http://localhost:8000/"},
+                       expectListen:   "http://localhost:8000/",
+                       expectInternal: "https://hostname.example/",
+               },
+               {
+                       internalURLs: map[string]string{
+                               "https://hostname1.example/": "http://localhost:12435/",
+                               "https://hostname2.example/": "http://localhost:" + unusedPort + "/",
+                       },
+                       envVar:         "https://hostname2.example", // note this works despite missing trailing "/"
+                       expectListen:   "http://localhost:" + unusedPort + "/",
+                       expectInternal: "https://hostname2.example/",
+               },
+               { // cannot listen on any of the ListenURLs
+                       internalURLs: map[string]string{
+                               "https://hostname1.example/": "http://1.2.3.4:" + unusedPort + "/",
+                               "https://hostname2.example/": "http://1.2.3.4:" + unusedPort + "/",
+                       },
+                       expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
+               },
+               { // cannot listen on any of the (implied) ListenURLs
+                       internalURLs: map[string]string{
+                               "https://1.2.3.4/": "",
+                               "https://1.2.3.5/": "",
+                       },
+                       expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
+               },
+               { // impossible port number
+                       internalURLs: map[string]string{
+                               "https://host.example/": "http://0.0.0.0:1234567",
+                       },
+                       expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
+               },
+               {
+                       // env var URL not mentioned in config = obey env var, with warning
+                       internalURLs:    map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
+                       envVar:          "https://hostname2.example",
+                       expectListen:    "https://hostname2.example/",
+                       expectInternal:  "https://hostname2.example/",
+                       expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
+               },
+               {
+                       // env var + empty config = obey env var, with warning
+                       envVar:          "https://hostname.example",
+                       expectListen:    "https://hostname.example/",
+                       expectInternal:  "https://hostname.example/",
+                       expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
+               },
+       } {
+               c.Logf("trial %d %+v", idx, trial)
+               os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
+               var logbuf bytes.Buffer
+               log := ctxlog.New(&logbuf, "text", "info")
+               services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
+               for k, v := range trial.internalURLs {
+                       u, err := url.Parse(k)
+                       c.Assert(err, check.IsNil)
+                       si := arvados.ServiceInstance{}
+                       if v != "" {
+                               u, err := url.Parse(v)
+                               c.Assert(err, check.IsNil)
+                               si.ListenURL = arvados.URL(*u)
+                       }
+                       services.Controller.InternalURLs[arvados.URL(*u)] = si
+               }
+               listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
+               if trial.expectLogsMatch != "" {
+                       c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
+               }
+               if trial.expectErrorMatch != "" {
+                       c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
+                       continue
+               }
+               if !c.Check(err, check.IsNil) {
+                       continue
+               }
+               c.Check(listenURL.String(), check.Equals, trial.expectListen)
+               c.Check(internalURL.String(), check.Equals, trial.expectInternal)
+       }
+}
+
 func (*Suite) TestCommand(c *check.C) {
        cf, err := ioutil.TempFile("", "cmd_test.")
        c.Assert(err, check.IsNil)
index 0d8f293124976cb42f9d009da1b16c5057ba1071..c90551a6109af9dc9afbdd33bed9c78f5f7bc5ed 100644 (file)
@@ -401,6 +401,7 @@ func (su URL) String() string {
 }
 
 type ServiceInstance struct {
+       ListenURL  URL
        Rendezvous string `json:",omitempty"`
 }
 
diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go
new file mode 100644 (file)
index 0000000..cb08acf
--- /dev/null
@@ -0,0 +1,133 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+       "encoding/json"
+       "net/http"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+// Inspect serves a report of current requests at "GET
+// /_inspect/requests", and passes other requests through to the next
+// handler.
+//
+// If registry is not nil, Inspect registers metrics about current
+// requests.
+func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+       type ent struct {
+               startTime  time.Time
+               hangupTime atomic.Value
+       }
+       current := map[*http.Request]*ent{}
+       mtx := sync.Mutex{}
+       if registry != nil {
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_active_request_age_seconds",
+                               Help:      "Age of oldest active request",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if _, ok := e.hangupTime.Load().(time.Time); ok {
+                                               // Don't count abandoned requests here
+                                               continue
+                                       }
+                                       if !any || e.startTime.Before(earliest) {
+                                               any = true
+                                               earliest = e.startTime
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_abandoned_request_age_seconds",
+                               Help:      "Maximum time since client hung up on a request whose processing thread is still running",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+                                               if !any || hangupTime.Before(earliest) {
+                                                       any = true
+                                                       earliest = hangupTime
+                                               }
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+       }
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
+                       if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+                               Error(w, "unauthorized", http.StatusUnauthorized)
+                               return
+                       }
+                       mtx.Lock()
+                       defer mtx.Unlock()
+                       type outrec struct {
+                               RequestID  string
+                               Method     string
+                               Host       string
+                               URL        string
+                               RemoteAddr string
+                               Elapsed    float64
+                       }
+                       now := time.Now()
+                       outrecs := []outrec{}
+                       for req, e := range current {
+                               outrecs = append(outrecs, outrec{
+                                       RequestID:  req.Header.Get(HeaderRequestID),
+                                       Method:     req.Method,
+                                       Host:       req.Host,
+                                       URL:        req.URL.String(),
+                                       RemoteAddr: req.RemoteAddr,
+                                       Elapsed:    now.Sub(e.startTime).Seconds(),
+                               })
+                       }
+                       sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+                       w.Header().Set("Content-Type", "application/json")
+                       json.NewEncoder(w).Encode(outrecs)
+               } else {
+                       e := ent{startTime: time.Now()}
+                       mtx.Lock()
+                       current[req] = &e
+                       mtx.Unlock()
+                       go func() {
+                               <-req.Context().Done()
+                               e.hangupTime.Store(time.Now())
+                       }()
+                       defer func() {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               delete(current, req)
+                       }()
+                       next.ServeHTTP(w, req)
+               }
+       })
+}
diff --git a/sdk/go/httpserver/inspect_test.go b/sdk/go/httpserver/inspect_test.go
new file mode 100644 (file)
index 0000000..624cedb
--- /dev/null
@@ -0,0 +1,98 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+       "context"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "strings"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       check "gopkg.in/check.v1"
+)
+
+func (s *Suite) TestInspect(c *check.C) {
+       reg := prometheus.NewRegistry()
+       h := newTestHandler()
+       mh := Inspect(reg, "abcd", h)
+       handlerReturned := make(chan struct{})
+       reqctx, reqcancel := context.WithCancel(context.Background())
+       longreq := httptest.NewRequest("GET", "/test", nil).WithContext(reqctx)
+       go func() {
+               mh.ServeHTTP(httptest.NewRecorder(), longreq)
+               close(handlerReturned)
+       }()
+       <-h.inHandler
+
+       resp := httptest.NewRecorder()
+       req := httptest.NewRequest("GET", "/_inspect/requests", nil)
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+       c.Check(resp.Body.String(), check.Equals, `{"errors":["unauthorized"]}`+"\n")
+
+       resp = httptest.NewRecorder()
+       req.Header.Set("Authorization", "Bearer abcde")
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+       resp = httptest.NewRecorder()
+       req.Header.Set("Authorization", "Bearer abcd")
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       reqs := []map[string]interface{}{}
+       err := json.NewDecoder(resp.Body).Decode(&reqs)
+       c.Check(err, check.IsNil)
+       c.Check(reqs, check.HasLen, 1)
+       c.Check(reqs[0]["URL"], check.Equals, "/test")
+
+       // Request is active, so we should see active request age > 0
+       resp = httptest.NewRecorder()
+       mreq := httptest.NewRequest("GET", "/metrics", nil)
+       promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+       reqcancel()
+
+       // Request context is canceled but handler hasn't returned, so
+       // we should see max abandoned request age > 0 and active ==
+       // 0. We might need to wait a short time for the cancel to
+       // propagate.
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); time.Sleep(time.Second / 100) {
+               resp = httptest.NewRecorder()
+               promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+               c.Assert(resp.Code, check.Equals, http.StatusOK)
+               if strings.Contains(resp.Body.String(), "\narvados_max_active_request_age_seconds 0\n") {
+                       break
+               }
+       }
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+
+       h.okToProceed <- struct{}{}
+       <-handlerReturned
+
+       // Handler has returned, so we should see max abandoned
+       // request age == max active request age == 0
+       resp = httptest.NewRecorder()
+       promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+       // ...and no active requests at the /_monitor endpoint
+       resp = httptest.NewRecorder()
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       reqs = nil
+       err = json.NewDecoder(resp.Body).Decode(&reqs)
+       c.Check(err, check.IsNil)
+       c.Assert(reqs, check.HasLen, 0)
+}
index 5a46635e9102365bbfd01c9c9c120bd8e23a7026..b71adf71181a9eb6b550093f73dbe4ab884038ce 100644 (file)
@@ -47,7 +47,13 @@ func (hn hijackNotifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
 // HandlerWithDeadline cancels the request context if the request
 // takes longer than the specified timeout without having its
 // connection hijacked.
+//
+// If timeout is 0, there is no deadline: HandlerWithDeadline is a
+// no-op.
 func HandlerWithDeadline(timeout time.Duration, next http.Handler) http.Handler {
+       if timeout == 0 {
+               return next
+       }
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                ctx, cancel := context.WithCancel(r.Context())
                defer cancel()
index 64d1f3d4cfb3fc47930ad1d655ae97366af6efb0..9258fbfa58f4b5a4867651fc15aba4e9b9616dcf 100644 (file)
@@ -22,7 +22,7 @@ func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        <-h.okToProceed
 }
 
-func newTestHandler(maxReqs int) *testHandler {
+func newTestHandler() *testHandler {
        return &testHandler{
                inHandler:   make(chan struct{}),
                okToProceed: make(chan struct{}),
@@ -30,7 +30,7 @@ func newTestHandler(maxReqs int) *testHandler {
 }
 
 func TestRequestLimiter1(t *testing.T) {
-       h := newTestHandler(10)
+       h := newTestHandler()
        l := NewRequestLimiter(1, h, nil)
        var wg sync.WaitGroup
        resps := make([]*httptest.ResponseRecorder, 10)
@@ -90,7 +90,7 @@ func TestRequestLimiter1(t *testing.T) {
 }
 
 func TestRequestLimiter10(t *testing.T) {
-       h := newTestHandler(10)
+       h := newTestHandler()
        l := NewRequestLimiter(10, h, nil)
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {