13493: Merge branch 'master' into 13493-federation-proxy
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 16 Jul 2018 13:46:23 +0000 (09:46 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 16 Jul 2018 13:46:23 +0000 (09:46 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

1  2 
lib/controller/handler.go
lib/controller/handler_test.go
sdk/go/arvados/config.go
sdk/python/tests/run_test_server.py

index c50f98273c10bc19e0b9da44409f53545c636461,a1a69a88e4ccd1bbb6d4882620581e91b3a03523..69b1866162c6fe1488ba3ca0b76d92817a4658ef
@@@ -5,8 -5,8 +5,8 @@@
  package controller
  
  import (
 -      "context"
 -      "io"
 +      "database/sql"
 +      "errors"
        "net"
        "net/http"
        "net/url"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
 +      _ "github.com/lib/pq"
  )
  
  type Handler struct {
        Cluster     *arvados.Cluster
        NodeProfile *arvados.NodeProfile
  
 -      setupOnce    sync.Once
 -      handlerStack http.Handler
 -      proxyClient  *arvados.Client
 +      setupOnce      sync.Once
 +      handlerStack   http.Handler
 +      proxy          *proxy
 +      secureClient   *http.Client
 +      insecureClient *http.Client
 +      pgdb           *sql.DB
 +      pgdbMtx        sync.Mutex
  }
  
  func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        h.setupOnce.Do(h.setup)
+       if req.Method != "GET" && req.Method != "HEAD" {
+               // http.ServeMux returns 301 with a cleaned path if
+               // the incoming request has a double slash. Some
+               // clients (including the Go standard library) change
+               // the request method to GET when following a 301
+               // redirect if the original method was not HEAD
+               // (RFC7231 6.4.2 specifically allows this in the case
+               // of POST). Thus "POST //foo" gets misdirected to
+               // "GET /foo". To avoid this, eliminate double slashes
+               // before passing the request to ServeMux.
+               for strings.Contains(req.URL.Path, "//") {
+                       req.URL.Path = strings.Replace(req.URL.Path, "//", "/", -1)
+               }
+       }
        h.handlerStack.ServeHTTP(w, req)
  }
  
  func (h *Handler) CheckHealth() error {
        h.setupOnce.Do(h.setup)
 -      _, err := findRailsAPI(h.Cluster, h.NodeProfile)
 +      _, _, err := findRailsAPI(h.Cluster, h.NodeProfile)
        return err
  }
  
@@@ -50,82 -59,94 +64,87 @@@ func (h *Handler) setup() 
                Token:  h.Cluster.ManagementToken,
                Prefix: "/_health/",
        })
 -      mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
 +      hs := http.NotFoundHandler()
 +      hs = prepend(hs, h.proxyRailsAPI)
 +      hs = prepend(hs, h.proxyRemoteCluster)
 +      mux.Handle("/", hs)
        h.handlerStack = mux
  
 +      sc := *arvados.DefaultSecureClient
 +      sc.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
 +      h.secureClient = &sc
 +
 +      ic := *arvados.InsecureHTTPClient
 +      ic.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
 +      h.insecureClient = &ic
 +
 +      h.proxy = &proxy{
 +              Name:           "arvados-controller",
 +              RequestTimeout: time.Duration(h.Cluster.HTTPRequestTimeout),
 +      }
++
+       // Changing the global isn't the right way to do this, but a
+       // proper solution would conflict with an impending 13493
+       // merge anyway, so this will do for now.
+       arvados.InsecureHTTPClient.CheckRedirect = func(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }
  }
  
 -// headers that shouldn't be forwarded when proxying. See
 -// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
 -var dropHeaders = map[string]bool{
 -      "Connection":          true,
 -      "Keep-Alive":          true,
 -      "Proxy-Authenticate":  true,
 -      "Proxy-Authorization": true,
 -      "TE":                true,
 -      "Trailer":           true,
 -      "Transfer-Encoding": true,
 -      "Upgrade":           true,
 -}
 +var errDBConnection = errors.New("database connection error")
  
 -func (h *Handler) proxyRailsAPI(w http.ResponseWriter, reqIn *http.Request) {
 -      urlOut, err := findRailsAPI(h.Cluster, h.NodeProfile)
 -      if err != nil {
 -              httpserver.Error(w, err.Error(), http.StatusInternalServerError)
 -              return
 -      }
 -      urlOut = &url.URL{
 -              Scheme:   urlOut.Scheme,
 -              Host:     urlOut.Host,
 -              Path:     reqIn.URL.Path,
 -              RawPath:  reqIn.URL.RawPath,
 -              RawQuery: reqIn.URL.RawQuery,
 +func (h *Handler) db(req *http.Request) (*sql.DB, error) {
 +      h.pgdbMtx.Lock()
 +      defer h.pgdbMtx.Unlock()
 +      if h.pgdb != nil {
 +              return h.pgdb, nil
        }
  
 -      // Copy headers from incoming request, then add/replace proxy
 -      // headers like Via and X-Forwarded-For.
 -      hdrOut := http.Header{}
 -      for k, v := range reqIn.Header {
 -              if !dropHeaders[k] {
 -                      hdrOut[k] = v
 -              }
 +      db, err := sql.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
 +      if err != nil {
 +              httpserver.Logger(req).WithError(err).Error("postgresql connect failed")
 +              return nil, errDBConnection
        }
 -      xff := reqIn.RemoteAddr
 -      if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
 -              xff = xffIn + "," + xff
 +      if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
 +              db.SetMaxOpenConns(p)
        }
 -      hdrOut.Set("X-Forwarded-For", xff)
 -      if hdrOut.Get("X-Forwarded-Proto") == "" {
 -              hdrOut.Set("X-Forwarded-Proto", reqIn.URL.Scheme)
 +      if err := db.Ping(); err != nil {
 +              httpserver.Logger(req).WithError(err).Error("postgresql connect succeeded but ping failed")
 +              return nil, errDBConnection
        }
 -      hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
 +      h.pgdb = db
 +      return db, nil
 +}
  
 -      ctx := reqIn.Context()
 -      if timeout := h.Cluster.HTTPRequestTimeout; timeout > 0 {
 -              var cancel context.CancelFunc
 -              ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(timeout)))
 -              defer cancel()
 -      }
 +type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
 +
 +func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
 +      return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 +              middleware(w, req, next)
 +      })
 +}
  
 -      reqOut := (&http.Request{
 -              Method: reqIn.Method,
 -              URL:    urlOut,
 -              Host:   reqIn.Host,
 -              Header: hdrOut,
 -              Body:   reqIn.Body,
 -      }).WithContext(ctx)
 -      resp, err := arvados.InsecureHTTPClient.Do(reqOut)
 +func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
 +      urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
        if err != nil {
                httpserver.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
 -      for k, v := range resp.Header {
 -              for _, v := range v {
 -                      w.Header().Add(k, v)
 -              }
 +      urlOut = &url.URL{
 +              Scheme:   urlOut.Scheme,
 +              Host:     urlOut.Host,
 +              Path:     req.URL.Path,
 +              RawPath:  req.URL.RawPath,
 +              RawQuery: req.URL.RawQuery,
        }
 -      w.WriteHeader(resp.StatusCode)
 -      n, err := io.Copy(w, resp.Body)
 -      if err != nil {
 -              httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
 +      client := h.secureClient
 +      if insecure {
 +              client = h.insecureClient
        }
 +      h.proxy.Do(w, req, urlOut, client)
  }
  
  // For now, findRailsAPI always uses the rails API running on this
  // node.
 -func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, error) {
 +func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, bool, error) {
        hostport := np.RailsAPI.Listen
        if len(hostport) > 1 && hostport[0] == ':' && strings.TrimRight(hostport[1:], "0123456789") == "" {
                // ":12345" => connect to indicated port on localhost
        } else if _, _, err := net.SplitHostPort(hostport); err == nil {
                // "[::1]:12345" => connect to indicated address & port
        } else {
 -              return nil, err
 +              return nil, false, err
        }
        proto := "http"
        if np.RailsAPI.TLS {
                proto = "https"
        }
 -      return url.Parse(proto + "://" + hostport)
 +      url, err := url.Parse(proto + "://" + hostport)
 +      return url, np.RailsAPI.Insecure, err
  }
index 2e833ed18dba92602b12af903687e63e780d8dc0,eb947ea363705293679da1edd3430e2d8d5c0657..2f9280e11bc1a99f6822b3ef2c8a37d8133974b4
@@@ -34,12 -34,11 +34,12 @@@ type HandlerSuite struct 
  
  func (s *HandlerSuite) SetUpTest(c *check.C) {
        s.cluster = &arvados.Cluster{
 -              ClusterID: "zzzzz",
 +              ClusterID:  "zzzzz",
 +              PostgreSQL: integrationTestCluster().PostgreSQL,
                NodeProfiles: map[string]arvados.NodeProfile{
                        "*": {
                                Controller: arvados.SystemServiceInstance{Listen: ":"},
 -                              RailsAPI:   arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true},
 +                              RailsAPI:   arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true, Insecure: true},
                        },
                },
        }
@@@ -66,12 -65,12 +66,12 @@@ func (s *HandlerSuite) TestRequestTimeo
        req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
 -      c.Check(resp.Code, check.Equals, http.StatusInternalServerError)
 +      c.Check(resp.Code, check.Equals, http.StatusBadGateway)
        var jresp httpserver.ErrorResponse
        err := json.Unmarshal(resp.Body.Bytes(), &jresp)
        c.Check(err, check.IsNil)
        c.Assert(len(jresp.Errors), check.Equals, 1)
 -      c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded`)
 +      c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded.*`)
  }
  
  func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) {
@@@ -102,7 -101,6 +102,7 @@@ func (s *HandlerSuite) TestProxyWithTok
                "_method":   {"GET"},
                "api_token": {arvadostest.ActiveToken},
        }.Encode()))
 +      req.Header.Set("Content-type", "application/x-www-form-urlencoded")
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
        c.Check(resp.Code, check.Equals, http.StatusOK)
@@@ -122,3 -120,11 +122,11 @@@ func (s *HandlerSuite) TestProxyNotFoun
        c.Check(err, check.IsNil)
        c.Check(jresp["errors"], check.FitsTypeOf, []interface{}{})
  }
+ func (s *HandlerSuite) TestProxyRedirect(c *check.C) {
+       req := httptest.NewRequest("GET", "https://example.org:1234/login?return_to=foo", nil)
+       resp := httptest.NewRecorder()
+       s.handler.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusFound)
+       c.Check(resp.Header().Get("Location"), check.Matches, `https://example\.org:1234/auth/joshid\?return_to=foo&?`)
+ }
diff --combined sdk/go/arvados/config.go
index 608bc223b4a3e96c160c7739db6426ff27acbb80,353901855683f296811a42e64b008568071dbdad..6edd18418bb8015087f8b486acf6ee21d2d26db4
@@@ -5,6 -5,8 +5,8 @@@
  package arvados
  
  import (
+       "encoding/json"
+       "errors"
        "fmt"
        "os"
  
@@@ -52,41 -54,60 +54,81 @@@ type Cluster struct 
        ClusterID          string `json:"-"`
        ManagementToken    string
        NodeProfiles       map[string]NodeProfile
-       InstanceTypes      []InstanceType
+       InstanceTypes      InstanceTypeMap
        HTTPRequestTimeout Duration
 +      RemoteClusters     map[string]RemoteCluster
 +      PostgreSQL         PostgreSQL
 +}
 +
 +type PostgreSQL struct {
 +      Connection     PostgreSQLConnection
 +      ConnectionPool int
 +}
 +
 +type PostgreSQLConnection map[string]string
 +
 +type RemoteCluster struct {
 +      // API endpoint host or host:port; default is {id}.arvadosapi.com
 +      Host string
 +      // Perform a proxy request when a local client requests an
 +      // object belonging to this remote.
 +      Proxy bool
 +      // Scheme, default "https". Can be set to "http" for testing.
 +      Scheme string
 +      // Disable TLS verify. Can be set to true for testing.
 +      Insecure bool
  }
  
  type InstanceType struct {
        Name         string
        ProviderType string
        VCPUs        int
-       RAM          int64
-       Scratch      int64
+       RAM          ByteSize
+       Scratch      ByteSize
        Price        float64
        Preemptible  bool
  }
  
+ type InstanceTypeMap map[string]InstanceType
+ var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
+ // UnmarshalJSON handles old config files that provide an array of
+ // instance types instead of a hash.
+ func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
+       if len(data) > 0 && data[0] == '[' {
+               var arr []InstanceType
+               err := json.Unmarshal(data, &arr)
+               if err != nil {
+                       return err
+               }
+               if len(arr) == 0 {
+                       *it = nil
+                       return nil
+               }
+               *it = make(map[string]InstanceType, len(arr))
+               for _, t := range arr {
+                       if _, ok := (*it)[t.Name]; ok {
+                               return errDuplicateInstanceTypeName
+                       }
+                       (*it)[t.Name] = t
+               }
+               return nil
+       }
+       var hash map[string]InstanceType
+       err := json.Unmarshal(data, &hash)
+       if err != nil {
+               return err
+       }
+       // Fill in Name field using hash key.
+       *it = InstanceTypeMap(hash)
+       for name, t := range *it {
+               t.Name = name
+               (*it)[name] = t
+       }
+       return nil
+ }
  // GetNodeProfile returns a NodeProfile for the given hostname. An
  // error is returned if the appropriate configuration can't be
  // determined (e.g., this does not appear to be a system node). If
@@@ -151,7 -172,6 +193,7 @@@ func (np *NodeProfile) ServicePorts() m
  }
  
  type SystemServiceInstance struct {
 -      Listen string
 -      TLS    bool
 +      Listen   string
 +      TLS      bool
 +      Insecure bool
  }
index 05cefbc9e58fcd4b289495c9269f501e3f1d04a0,102433cd4186fbf392d8f2fc56af804bdec4d890..8df95553d49e825db63286d9125077b6c53682a6
@@@ -174,7 -174,7 +174,7 @@@ def find_available_port()
      sock.close()
      return port
  
- def _wait_until_port_listens(port, timeout=10):
+ def _wait_until_port_listens(port, timeout=10, warn=True):
      """Wait for a process to start listening on the given port.
  
      If nothing listens on the port within the specified timeout (given
          except subprocess.CalledProcessError:
              time.sleep(0.1)
              continue
-         return
-     print(
-         "WARNING: Nothing is listening on port {} (waited {} seconds).".
-         format(port, timeout),
-         file=sys.stderr)
+         return True
+     if warn:
+         print(
+             "WARNING: Nothing is listening on port {} (waited {} seconds).".
+             format(port, timeout),
+             file=sys.stderr)
+     return False
  
  def _logfilename(label):
      """Set up a labelled log file, and return a path to write logs to.
@@@ -375,8 -377,11 +377,11 @@@ def reset()
          'POST',
          headers={'Authorization': 'OAuth2 {}'.format(token)})
      os.environ['ARVADOS_API_HOST_INSECURE'] = 'true'
-     os.environ['ARVADOS_API_HOST'] = existing_api_host
      os.environ['ARVADOS_API_TOKEN'] = token
+     if _wait_until_port_listens(_getport('controller-ssl'), timeout=0.5, warn=False):
+         os.environ['ARVADOS_API_HOST'] = '0.0.0.0:'+str(_getport('controller-ssl'))
+     else:
+         os.environ['ARVADOS_API_HOST'] = existing_api_host
  
  def stop(force=False):
      """Stop the API server, if one is running.
@@@ -408,13 -413,6 +413,13 @@@ def run_controller()
          f.write("""
  Clusters:
    zzzzz:
 +    PostgreSQL:
 +      ConnectionPool: 32
 +      Connection:
 +        host: {}
 +        dbname: {}
 +        user: {}
 +        password: {}
      NodeProfiles:
        "*":
          "arvados-controller":
          "arvados-api-server":
            Listen: ":{}"
            TLS: true
 -        """.format(port, rails_api_port))
 +          Insecure: true
 +        """.format(
 +            _dbconfig('host'),
 +            _dbconfig('database'),
 +            _dbconfig('username'),
 +            _dbconfig('password'),
 +            port,
 +            rails_api_port,
 +        ))
      logf = open(_logfilename('controller'), 'a')
      controller = subprocess.Popen(
          ["arvados-server", "controller", "-config", conf],
@@@ -649,7 -639,7 +654,7 @@@ def run_keep_web()
      keepweb = subprocess.Popen(
          ['keep-web',
           '-allow-anonymous',
-          '-attachment-only-host=download:'+str(keepwebport),
+          '-attachment-only-host=download',
           '-listen=:'+str(keepwebport)],
          env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
      with open(_pidfile('keep-web'), 'w') as f: