--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bytes"
+ "database/sql"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "regexp"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+var wfRe = regexp.MustCompile(`^/arvados/v1/workflows/([0-9a-z]{5})-[^/]+$`)
+
+func (h *Handler) proxyRemoteCluster(w http.ResponseWriter, req *http.Request, next http.Handler) {
+ m := wfRe.FindStringSubmatch(req.URL.Path)
+ if len(m) < 2 || m[1] == h.Cluster.ClusterID {
+ next.ServeHTTP(w, req)
+ return
+ }
+ remoteID := m[1]
+ remote, ok := h.Cluster.RemoteClusters[remoteID]
+ if !ok {
+ httpserver.Error(w, "no proxy available for cluster "+remoteID, http.StatusNotFound)
+ return
+ }
+ scheme := remote.Scheme
+ if scheme == "" {
+ scheme = "https"
+ }
+ err := h.saltAuthToken(req, remoteID)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ urlOut := &url.URL{
+ Scheme: scheme,
+ Host: remote.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: req.URL.RawQuery,
+ }
+ client := h.secureClient
+ if remote.Insecure {
+ client = h.insecureClient
+ }
+ h.proxy.Do(w, req, urlOut, client)
+}
+
+// Extract the auth token supplied in req, and replace it with a
+// salted token for the remote cluster.
+func (h *Handler) saltAuthToken(req *http.Request, remote string) error {
+ creds := auth.NewCredentials()
+ creds.LoadTokensFromHTTPRequest(req)
+ if len(creds.Tokens) == 0 && req.Header.Get("Content-Type") == "application/x-www-form-encoded" {
+ // Override ParseForm's 10MiB limit by ensuring
+ // req.Body is a *http.maxBytesReader.
+ req.Body = http.MaxBytesReader(nil, req.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
+ if err := creds.LoadTokensFromHTTPRequestBody(req); err != nil {
+ return err
+ }
+ // Replace req.Body with a buffer that re-encodes the
+ // form without api_token, in case we end up
+ // forwarding the request.
+ if req.PostForm != nil {
+ req.PostForm.Del("api_token")
+ }
+ req.Body = ioutil.NopCloser(bytes.NewBufferString(req.PostForm.Encode()))
+ }
+ if len(creds.Tokens) == 0 {
+ return nil
+ }
+ token, err := auth.SaltToken(creds.Tokens[0], remote)
+ if err == auth.ErrObsoleteToken {
+ // If the token exists in our own database, salt it
+ // for the remote. Otherwise, assume it was issued by
+ // the remote, and pass it through unmodified.
+ db, err := h.db(req)
+ if err != nil {
+ return err
+ }
+ aca := arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}
+ err = db.QueryRowContext(req.Context(), `SELECT uuid FROM api_client_authorizations WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, aca.APIToken).Scan(&aca.UUID)
+ if err == sql.ErrNoRows {
+ // Not ours; pass through unmodified.
+ token = aca.APIToken
+ } else if err != nil {
+ return err
+ } else {
+ // Found; make V2 version and salt it.
+ token, err = auth.SaltToken(aca.TokenV2(), remote)
+ if err != nil {
+ return err
+ }
+ }
+ } else if err != nil {
+ return err
+ }
+ req.Header.Set("Authorization", "Bearer "+token)
+
+ // Remove api_token=... from the the query string, in case we
+ // end up forwarding the request.
+ if values, err := url.ParseQuery(req.URL.RawQuery); err != nil {
+ return err
+ } else if _, ok := values["api_token"]; ok {
+ delete(values, "api_token")
+ req.URL.RawQuery = values.Encode()
+ }
+ return nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+var _ = check.Suite(&FederationSuite{})
+
+type FederationSuite struct {
+ log *logrus.Logger
+ // testServer and testHandler are the controller being tested,
+ // "zhome".
+ testServer *httpserver.Server
+ testHandler *Handler
+ // remoteServer ("zzzzz") forwards requests to the Rails API
+ // provided by the integration test environment.
+ remoteServer *httpserver.Server
+ // remoteMock ("zmock") appends each incoming request to
+ // remoteMockRequests, and returns an empty 200 response.
+ remoteMock *httpserver.Server
+ remoteMockRequests []http.Request
+}
+
+func (s *FederationSuite) SetUpTest(c *check.C) {
+ s.log = logrus.New()
+ s.log.Formatter = &logrus.JSONFormatter{}
+ s.log.Out = &logWriter{c.Log}
+
+ s.remoteServer = newServerFromIntegrationTestEnv(c)
+ c.Assert(s.remoteServer.Start(), check.IsNil)
+
+ s.remoteMock = newServerFromIntegrationTestEnv(c)
+ s.remoteMock.Server.Handler = http.HandlerFunc(s.remoteMockHandler)
+ c.Assert(s.remoteMock.Start(), check.IsNil)
+
+ nodeProfile := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: ":1"}, // local reqs will error "connection refused"
+ }
+ s.testHandler = &Handler{Cluster: &arvados.Cluster{
+ ClusterID: "zhome",
+ PostgreSQL: integrationTestCluster().PostgreSQL,
+ NodeProfiles: map[string]arvados.NodeProfile{
+ "*": nodeProfile,
+ },
+ }, NodeProfile: &nodeProfile}
+ s.testServer = newServerFromIntegrationTestEnv(c)
+ s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler))
+
+ s.testHandler.Cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+ "zzzzz": {
+ Host: s.remoteServer.Addr,
+ Proxy: true,
+ Scheme: "http",
+ },
+ "zmock": {
+ Host: s.remoteMock.Addr,
+ Proxy: true,
+ Scheme: "http",
+ },
+ }
+
+ c.Assert(s.testServer.Start(), check.IsNil)
+
+ s.remoteMockRequests = nil
+}
+
+func (s *FederationSuite) remoteMockHandler(w http.ResponseWriter, req *http.Request) {
+ s.remoteMockRequests = append(s.remoteMockRequests, *req)
+}
+
+func (s *FederationSuite) TearDownTest(c *check.C) {
+ if s.remoteServer != nil {
+ s.remoteServer.Close()
+ }
+ if s.testServer != nil {
+ s.testServer.Close()
+ }
+}
+
+func (s *FederationSuite) testRequest(req *http.Request) *http.Response {
+ resp := httptest.NewRecorder()
+ s.testServer.Server.Handler.ServeHTTP(resp, req)
+ return resp.Result()
+}
+
+func (s *FederationSuite) TestLocalRequest(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zhome-", 1), nil)
+ resp := s.testRequest(req)
+ s.checkHandledLocally(c, resp)
+}
+
+func (s *FederationSuite) checkHandledLocally(c *check.C, resp *http.Response) {
+ // Our "home" controller can't handle local requests because
+ // it doesn't have its own stub/test Rails API, so we rely on
+ // "connection refused" to indicate the controller tried to
+ // proxy the request to its local Rails API.
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+ s.checkJSONErrorMatches(c, resp, `.*connection refused`)
+}
+
+func (s *FederationSuite) TestNoAuth(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+ s.checkJSONErrorMatches(c, resp, `Not logged in`)
+}
+
+func (s *FederationSuite) TestBadAuth(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+ req.Header.Set("Authorization", "Bearer aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+ s.checkJSONErrorMatches(c, resp, `Not logged in`)
+}
+
+func (s *FederationSuite) TestNoAccess(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.SpectatorToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+ s.checkJSONErrorMatches(c, resp, `.*not found`)
+}
+
+func (s *FederationSuite) TestGetUnknownRemote(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zz404-", 1), nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+ s.checkJSONErrorMatches(c, resp, `.*no proxy available for cluster zz404`)
+}
+
+func (s *FederationSuite) TestRemoteError(c *check.C) {
+ rc := s.testHandler.Cluster.RemoteClusters["zzzzz"]
+ rc.Scheme = "https"
+ s.testHandler.Cluster.RemoteClusters["zzzzz"] = rc
+
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+ s.checkJSONErrorMatches(c, resp, `.*HTTP response to HTTPS client`)
+}
+
+func (s *FederationSuite) TestGetRemoteWorkflow(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var wf arvados.Workflow
+ c.Check(json.NewDecoder(resp.Body).Decode(&wf), check.IsNil)
+ c.Check(wf.UUID, check.Equals, arvadostest.WorkflowWithDefinitionYAMLUUID)
+ c.Check(wf.OwnerUUID, check.Equals, arvadostest.ActiveUserUUID)
+}
+
+func (s *FederationSuite) TestOptionsMethod(c *check.C) {
+ req := httptest.NewRequest("OPTIONS", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+ req.Header.Set("Origin", "https://example.com")
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ body, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+ c.Check(string(body), check.Equals, "")
+ c.Check(resp.Header.Get("Access-Control-Allow-Origin"), check.Equals, "*")
+ for _, hdr := range []string{"Authorization", "Content-Type"} {
+ c.Check(resp.Header.Get("Access-Control-Allow-Headers"), check.Matches, ".*"+hdr+".*")
+ }
+ for _, method := range []string{"GET", "HEAD", "PUT", "POST", "DELETE"} {
+ c.Check(resp.Header.Get("Access-Control-Allow-Methods"), check.Matches, ".*"+method+".*")
+ }
+}
+
+func (s *FederationSuite) TestRemoteWithTokenInQuery(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1)+"?api_token="+arvadostest.ActiveToken, nil)
+ s.testRequest(req)
+ c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+ pr := s.remoteMockRequests[0]
+ // Token is salted and moved from query to Authorization header.
+ c.Check(pr.URL.String(), check.Not(check.Matches), `.*api_token=.*`)
+ c.Check(pr.Header.Get("Authorization"), check.Equals, "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/7fd31b61f39c0e82a4155592163218272cedacdc")
+}
+
+func (s *FederationSuite) TestLocalTokenSalted(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1), nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ s.testRequest(req)
+ c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+ pr := s.remoteMockRequests[0]
+ // The salted token here has a "zzzzz-" UUID instead of a
+ // "ztest-" UUID because ztest's local database has the
+ // "zzzzz-" test fixtures. The "secret" part is HMAC(sha1,
+ // arvadostest.ActiveToken, "zmock") = "7fd3...".
+ c.Check(pr.Header.Get("Authorization"), check.Equals, "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/7fd31b61f39c0e82a4155592163218272cedacdc")
+}
+
+func (s *FederationSuite) TestRemoteTokenNotSalted(c *check.C) {
+ // remoteToken can be any v1 token that doesn't appear in
+ // ztest's local db.
+ remoteToken := "abcdef00000000000000000000000000000000000000000000"
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1), nil)
+ req.Header.Set("Authorization", "Bearer "+remoteToken)
+ s.testRequest(req)
+ c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+ pr := s.remoteMockRequests[0]
+ c.Check(pr.Header.Get("Authorization"), check.Equals, "Bearer "+remoteToken)
+}
+
+func (s *FederationSuite) TestWorkflowCRUD(c *check.C) {
+ wf := arvados.Workflow{
+ Description: "TestCRUD",
+ }
+ {
+ body := &strings.Builder{}
+ json.NewEncoder(body).Encode(&wf)
+ req := httptest.NewRequest("POST", "/arvados/v1/workflows", strings.NewReader(url.Values{
+ "workflow": {body.String()},
+ }.Encode()))
+ req.Header.Set("Content-type", "application/x-www-form-urlencoded")
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ rec := httptest.NewRecorder()
+ s.remoteServer.Server.Handler.ServeHTTP(rec, req) // direct to remote -- can't proxy a create req because no uuid
+ resp := rec.Result()
+ s.checkResponseOK(c, resp)
+ json.NewDecoder(resp.Body).Decode(&wf)
+
+ defer func() {
+ req := httptest.NewRequest("DELETE", "/arvados/v1/workflows/"+wf.UUID, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ s.remoteServer.Server.Handler.ServeHTTP(httptest.NewRecorder(), req)
+ }()
+ c.Check(wf.UUID, check.Not(check.Equals), "")
+
+ c.Assert(wf.ModifiedAt, check.NotNil)
+ c.Logf("wf.ModifiedAt: %v", wf.ModifiedAt)
+ c.Check(time.Since(*wf.ModifiedAt) < time.Minute, check.Equals, true)
+ }
+ for _, method := range []string{"PATCH", "PUT", "POST"} {
+ form := url.Values{
+ "workflow": {`{"description": "Updated with ` + method + `"}`},
+ }
+ if method == "POST" {
+ form["_method"] = []string{"PATCH"}
+ }
+ req := httptest.NewRequest(method, "/arvados/v1/workflows/"+wf.UUID, strings.NewReader(form.Encode()))
+ req.Header.Set("Content-type", "application/x-www-form-urlencoded")
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ s.checkResponseOK(c, resp)
+ err := json.NewDecoder(resp.Body).Decode(&wf)
+ c.Check(err, check.IsNil)
+
+ c.Check(wf.Description, check.Equals, "Updated with "+method)
+ }
+ {
+ req := httptest.NewRequest("DELETE", "/arvados/v1/workflows/"+wf.UUID, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ s.checkResponseOK(c, resp)
+ err := json.NewDecoder(resp.Body).Decode(&wf)
+ c.Check(err, check.IsNil)
+ }
+ {
+ req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+wf.UUID, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+ }
+}
+
+func (s *FederationSuite) checkResponseOK(c *check.C, resp *http.Response) {
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ if resp.StatusCode != http.StatusOK {
+ body, err := ioutil.ReadAll(resp.Body)
+ c.Logf("... response body = %q, %v\n", body, err)
+ }
+}
+
+func (s *FederationSuite) checkJSONErrorMatches(c *check.C, resp *http.Response, re string) {
+ var jresp httpserver.ErrorResponse
+ err := json.NewDecoder(resp.Body).Decode(&jresp)
+ c.Check(err, check.IsNil)
+ c.Assert(len(jresp.Errors), check.Equals, 1)
+ c.Check(jresp.Errors[0], check.Matches, re)
+}
package controller
import (
- "context"
- "io"
+ "database/sql"
+ "errors"
"net"
"net/http"
"net/url"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
+ _ "github.com/lib/pq"
)
type Handler struct {
Cluster *arvados.Cluster
NodeProfile *arvados.NodeProfile
- setupOnce sync.Once
- handlerStack http.Handler
- proxyClient *arvados.Client
+ setupOnce sync.Once
+ handlerStack http.Handler
+ proxy *proxy
+ secureClient *http.Client
+ insecureClient *http.Client
+ pgdb *sql.DB
+ pgdbMtx sync.Mutex
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (h *Handler) CheckHealth() error {
h.setupOnce.Do(h.setup)
- _, err := findRailsAPI(h.Cluster, h.NodeProfile)
+ _, _, err := findRailsAPI(h.Cluster, h.NodeProfile)
return err
}
Token: h.Cluster.ManagementToken,
Prefix: "/_health/",
})
- mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
+ hs := http.NotFoundHandler()
+ hs = prepend(hs, h.proxyRailsAPI)
+ hs = prepend(hs, h.proxyRemoteCluster)
+ mux.Handle("/", hs)
h.handlerStack = mux
+ sc := *arvados.DefaultSecureClient
+ sc.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
+ h.secureClient = &sc
+
+ ic := *arvados.InsecureHTTPClient
+ ic.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
+ h.insecureClient = &ic
+
+ h.proxy = &proxy{
+ Name: "arvados-controller",
+ RequestTimeout: time.Duration(h.Cluster.HTTPRequestTimeout),
+ }
+
// Changing the global isn't the right way to do this, but a
// proper solution would conflict with an impending 13493
// merge anyway, so this will do for now.
arvados.InsecureHTTPClient.CheckRedirect = func(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }
}
-// headers that shouldn't be forwarded when proxying. See
-// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
-var dropHeaders = map[string]bool{
- "Connection": true,
- "Keep-Alive": true,
- "Proxy-Authenticate": true,
- "Proxy-Authorization": true,
- "TE": true,
- "Trailer": true,
- "Transfer-Encoding": true,
- "Upgrade": true,
-}
+var errDBConnection = errors.New("database connection error")
-func (h *Handler) proxyRailsAPI(w http.ResponseWriter, reqIn *http.Request) {
- urlOut, err := findRailsAPI(h.Cluster, h.NodeProfile)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- urlOut = &url.URL{
- Scheme: urlOut.Scheme,
- Host: urlOut.Host,
- Path: reqIn.URL.Path,
- RawPath: reqIn.URL.RawPath,
- RawQuery: reqIn.URL.RawQuery,
+func (h *Handler) db(req *http.Request) (*sql.DB, error) {
+ h.pgdbMtx.Lock()
+ defer h.pgdbMtx.Unlock()
+ if h.pgdb != nil {
+ return h.pgdb, nil
}
- // Copy headers from incoming request, then add/replace proxy
- // headers like Via and X-Forwarded-For.
- hdrOut := http.Header{}
- for k, v := range reqIn.Header {
- if !dropHeaders[k] {
- hdrOut[k] = v
- }
+ db, err := sql.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
+ if err != nil {
+ httpserver.Logger(req).WithError(err).Error("postgresql connect failed")
+ return nil, errDBConnection
}
- xff := reqIn.RemoteAddr
- if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
- xff = xffIn + "," + xff
+ if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
}
- hdrOut.Set("X-Forwarded-For", xff)
- if hdrOut.Get("X-Forwarded-Proto") == "" {
- hdrOut.Set("X-Forwarded-Proto", reqIn.URL.Scheme)
+ if err := db.Ping(); err != nil {
+ httpserver.Logger(req).WithError(err).Error("postgresql connect succeeded but ping failed")
+ return nil, errDBConnection
}
- hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
+ h.pgdb = db
+ return db, nil
+}
- ctx := reqIn.Context()
- if timeout := h.Cluster.HTTPRequestTimeout; timeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(timeout)))
- defer cancel()
- }
+type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
+
+func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ middleware(w, req, next)
+ })
+}
- reqOut := (&http.Request{
- Method: reqIn.Method,
- URL: urlOut,
- Host: reqIn.Host,
- Header: hdrOut,
- Body: reqIn.Body,
- }).WithContext(ctx)
- resp, err := arvados.InsecureHTTPClient.Do(reqOut)
+func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
+ urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- for k, v := range resp.Header {
- for _, v := range v {
- w.Header().Add(k, v)
- }
+ urlOut = &url.URL{
+ Scheme: urlOut.Scheme,
+ Host: urlOut.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: req.URL.RawQuery,
}
- w.WriteHeader(resp.StatusCode)
- n, err := io.Copy(w, resp.Body)
- if err != nil {
- httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
+ client := h.secureClient
+ if insecure {
+ client = h.insecureClient
}
+ h.proxy.Do(w, req, urlOut, client)
}
// For now, findRailsAPI always uses the rails API running on this
// node.
-func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, error) {
+func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, bool, error) {
hostport := np.RailsAPI.Listen
if len(hostport) > 1 && hostport[0] == ':' && strings.TrimRight(hostport[1:], "0123456789") == "" {
// ":12345" => connect to indicated port on localhost
} else if _, _, err := net.SplitHostPort(hostport); err == nil {
// "[::1]:12345" => connect to indicated address & port
} else {
- return nil, err
+ return nil, false, err
}
proto := "http"
if np.RailsAPI.TLS {
proto = "https"
}
- return url.Parse(proto + "://" + hostport)
+ url, err := url.Parse(proto + "://" + hostport)
+ return url, np.RailsAPI.Insecure, err
}
func (s *HandlerSuite) SetUpTest(c *check.C) {
s.cluster = &arvados.Cluster{
- ClusterID: "zzzzz",
+ ClusterID: "zzzzz",
+ PostgreSQL: integrationTestCluster().PostgreSQL,
NodeProfiles: map[string]arvados.NodeProfile{
"*": {
Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true, Insecure: true},
},
},
}
req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
resp := httptest.NewRecorder()
s.handler.ServeHTTP(resp, req)
- c.Check(resp.Code, check.Equals, http.StatusInternalServerError)
+ c.Check(resp.Code, check.Equals, http.StatusBadGateway)
var jresp httpserver.ErrorResponse
err := json.Unmarshal(resp.Body.Bytes(), &jresp)
c.Check(err, check.IsNil)
c.Assert(len(jresp.Errors), check.Equals, 1)
- c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded`)
+ c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded.*`)
}
func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) {
"_method": {"GET"},
"api_token": {arvadostest.ActiveToken},
}.Encode()))
+ req.Header.Set("Content-type", "application/x-www-form-urlencoded")
resp := httptest.NewRecorder()
s.handler.ServeHTTP(resp, req)
c.Check(resp.Code, check.Equals, http.StatusOK)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "context"
+ "io"
+ "net/http"
+ "net/url"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+type proxy struct {
+ Name string // to use in Via header
+ RequestTimeout time.Duration
+}
+
+// headers that shouldn't be forwarded when proxying. See
+// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
+var dropHeaders = map[string]bool{
+ "Connection": true,
+ "Keep-Alive": true,
+ "Proxy-Authenticate": true,
+ "Proxy-Authorization": true,
+ "TE": true,
+ "Trailer": true,
+ "Transfer-Encoding": true,
+ "Upgrade": true,
+}
+
+func (p *proxy) Do(w http.ResponseWriter, reqIn *http.Request, urlOut *url.URL, client *http.Client) {
+ // Copy headers from incoming request, then add/replace proxy
+ // headers like Via and X-Forwarded-For.
+ hdrOut := http.Header{}
+ for k, v := range reqIn.Header {
+ if !dropHeaders[k] {
+ hdrOut[k] = v
+ }
+ }
+ xff := reqIn.RemoteAddr
+ if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
+ xff = xffIn + "," + xff
+ }
+ hdrOut.Set("X-Forwarded-For", xff)
+ hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
+
+ ctx := reqIn.Context()
+ if p.RequestTimeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
+ defer cancel()
+ }
+
+ reqOut := (&http.Request{
+ Method: reqIn.Method,
+ URL: urlOut,
+ Header: hdrOut,
+ Body: reqIn.Body,
+ }).WithContext(ctx)
+ resp, err := client.Do(reqOut)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadGateway)
+ return
+ }
+ for k, v := range resp.Header {
+ for _, v := range v {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(resp.StatusCode)
+ n, err := io.Copy(w, resp.Body)
+ if err != nil {
+ httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "bytes"
+ "net/http"
+ "os"
+ "path/filepath"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+// logWriter is an io.Writer that writes by calling a "write log"
+// function, typically (*check.C)Log().
+type logWriter struct {
+ logfunc func(...interface{})
+}
+
+func (tl *logWriter) Write(buf []byte) (int, error) {
+ tl.logfunc(string(bytes.TrimRight(buf, "\n")))
+ return len(buf), nil
+}
+
+func integrationTestCluster() *arvados.Cluster {
+ cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+ if err != nil {
+ panic(err)
+ }
+ cc, err := cfg.GetCluster("zzzzz")
+ if err != nil {
+ panic(err)
+ }
+ return cc
+}
+
+// Return a new unstarted controller server, using the Rails API
+// provided by the integration-testing environment.
+func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
+ log := logrus.New()
+ log.Formatter = &logrus.JSONFormatter{}
+ log.Out = &logWriter{c.Log}
+
+ nodeProfile := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true, Insecure: true},
+ }
+ handler := &Handler{Cluster: &arvados.Cluster{
+ ClusterID: "zzzzz",
+ PostgreSQL: integrationTestCluster().PostgreSQL,
+ NodeProfiles: map[string]arvados.NodeProfile{
+ "*": nodeProfile,
+ },
+ }, NodeProfile: &nodeProfile}
+
+ srv := &httpserver.Server{
+ Server: http.Server{
+ Handler: httpserver.AddRequestIDs(httpserver.LogRequests(log, handler)),
+ },
+ Addr: nodeProfile.Controller.Listen,
+ }
+ return srv
+}
type APIClientAuthorizationList struct {
Items []APIClientAuthorization `json:"items"`
}
+
+func (aca APIClientAuthorization) TokenV2() string {
+ return "v2/" + aca.UUID + "/" + aca.APIToken
+}
NodeProfiles map[string]NodeProfile
InstanceTypes InstanceTypeMap
HTTPRequestTimeout Duration
+ RemoteClusters map[string]RemoteCluster
+ PostgreSQL PostgreSQL
+}
+
+type PostgreSQL struct {
+ Connection PostgreSQLConnection
+ ConnectionPool int
+}
+
+type PostgreSQLConnection map[string]string
+
+type RemoteCluster struct {
+ // API endpoint host or host:port; default is {id}.arvadosapi.com
+ Host string
+ // Perform a proxy request when a local client requests an
+ // object belonging to this remote.
+ Proxy bool
+ // Scheme, default "https". Can be set to "http" for testing.
+ Scheme string
+ // Disable TLS verify. Can be set to true for testing.
+ Insecure bool
}
type InstanceType struct {
}
type SystemServiceInstance struct {
- Listen string
- TLS bool
+ Listen string
+ TLS bool
+ Insecure bool
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import "strings"
+
+func (c PostgreSQLConnection) String() string {
+ s := ""
+ for k, v := range c {
+ s += strings.ToLower(k)
+ s += "='"
+ s += strings.Replace(
+ strings.Replace(v, `\`, `\\`, -1),
+ `'`, `\'`, -1)
+ s += "' "
+ }
+ return s
+}
FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
FooCollectionSharingToken = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+
+ WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml"
)
// PathologicalManifest : A valid manifest designed to test
// token.
var DecodeTokenCookie func(string) ([]byte, error) = base64.URLEncoding.DecodeString
-// LoadTokensFromHttpRequest loads all tokens it can find in the
+// LoadTokensFromHTTPRequest loads all tokens it can find in the
// headers and query string of an http query.
func (a *Credentials) LoadTokensFromHTTPRequest(r *http.Request) {
// Load plain token from "Authorization: OAuth2 ..." header
a.Tokens = append(a.Tokens, string(token))
}
-// TODO: LoadTokensFromHttpRequestBody(). We can't assume in
-// LoadTokensFromHttpRequest() that [or how] we should read and parse
-// the request body. This has to be requested explicitly by the
-// application.
+// LoadTokensFromHTTPRequestBody() loads credentials from the request
+// body.
+//
+// This is separate from LoadTokensFromHTTPRequest() because it's not
+// always desirable to read the request body. This has to be requested
+// explicitly by the application.
+func (a *Credentials) LoadTokensFromHTTPRequestBody(r *http.Request) error {
+ if r.Header.Get("Content-Type") != "application/x-www-form-urlencoded" {
+ return nil
+ }
+ if err := r.ParseForm(); err != nil {
+ return err
+ }
+ if t := r.PostFormValue("api_token"); t != "" {
+ a.Tokens = append(a.Tokens, t)
+ }
+ return nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package auth
+
+import (
+ "crypto/hmac"
+ "crypto/sha1"
+ "errors"
+ "fmt"
+ "io"
+ "regexp"
+ "strings"
+)
+
+var (
+ reObsoleteToken = regexp.MustCompile(`^[0-9a-z]{41,}$`)
+ ErrObsoleteToken = errors.New("obsolete token format")
+ ErrTokenFormat = errors.New("badly formatted token")
+ ErrSalted = errors.New("token already salted")
+)
+
+func SaltToken(token, remote string) (string, error) {
+ parts := strings.Split(token, "/")
+ if len(parts) < 3 || parts[0] != "v2" {
+ if reObsoleteToken.MatchString(token) {
+ return "", ErrObsoleteToken
+ } else {
+ return "", ErrTokenFormat
+ }
+ }
+ uuid := parts[1]
+ secret := parts[2]
+ if len(secret) != 40 {
+ // not already salted
+ hmac := hmac.New(sha1.New, []byte(secret))
+ io.WriteString(hmac, remote)
+ secret = fmt.Sprintf("%x", hmac.Sum(nil))
+ return "v2/" + uuid + "/" + secret, nil
+ } else if strings.HasPrefix(uuid, remote) {
+ // already salted for the desired remote
+ return token, nil
+ } else {
+ // salted for a different remote, can't be used
+ return "", ErrSalted
+ }
+}
f.write("""
Clusters:
zzzzz:
+ PostgreSQL:
+ ConnectionPool: 32
+ Connection:
+ host: {}
+ dbname: {}
+ user: {}
+ password: {}
NodeProfiles:
"*":
"arvados-controller":
"arvados-api-server":
Listen: ":{}"
TLS: true
- """.format(port, rails_api_port))
+ Insecure: true
+ """.format(
+ _dbconfig('host'),
+ _dbconfig('database'),
+ _dbconfig('username'),
+ _dbconfig('password'),
+ port,
+ rails_api_port,
+ ))
logf = open(_logfilename('controller'), 'a')
controller = subprocess.Popen(
["arvados-server", "controller", "-config", conf],
end
def empty
- render text: "-"
+ render text: ""
end
end
type wsConfig struct {
Client arvados.Client
- Postgres pgConfig
+ Postgres arvados.PostgreSQLConnection
PostgresPool int
Listen string
LogLevel string
Client: arvados.Client{
APIHost: "localhost:443",
},
- Postgres: pgConfig{
+ Postgres: arvados.PostgreSQLConnection{
"dbname": "arvados_production",
"user": "arvados",
"password": "xyzzy",
"context"
"database/sql"
"strconv"
- "strings"
"sync"
"sync/atomic"
"time"
"github.com/lib/pq"
)
-type pgConfig map[string]string
-
-func (c pgConfig) ConnectionString() string {
- s := ""
- for k, v := range c {
- s += k
- s += "='"
- s += strings.Replace(
- strings.Replace(v, `\`, `\\`, -1),
- `'`, `\'`, -1)
- s += "' "
- }
- return s
-}
-
type pgEventSource struct {
DataSource string
MaxOpenConns int
import (
"database/sql"
"fmt"
+ "os"
+ "path/filepath"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
check "gopkg.in/check.v1"
)
type eventSourceSuite struct{}
-func testDBConfig() pgConfig {
- var railsDB struct {
- Test struct {
- Database string
- Username string
- Password string
- Host string
- }
- }
- err := config.LoadFile(&railsDB, "../api/config/database.yml")
+func testDBConfig() arvados.PostgreSQLConnection {
+ cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
if err != nil {
panic(err)
}
- cfg := pgConfig{
- "dbname": railsDB.Test.Database,
- "host": railsDB.Test.Host,
- "password": railsDB.Test.Password,
- "user": railsDB.Test.Username,
+ cc, err := cfg.GetCluster("zzzzz")
+ if err != nil {
+ panic(err)
}
- return cfg
+ return cc.PostgreSQL.Connection
}
func testDB() *sql.DB {
- db, err := sql.Open("postgres", testDBConfig().ConnectionString())
+ db, err := sql.Open("postgres", testDBConfig().String())
if err != nil {
panic(err)
}
cfg := testDBConfig()
db := testDB()
pges := &pgEventSource{
- DataSource: cfg.ConnectionString(),
+ DataSource: cfg.String(),
QueueSize: 4,
}
go pges.Run()
srv.listener = ln
srv.eventSource = &pgEventSource{
- DataSource: srv.wsConfig.Postgres.ConnectionString(),
+ DataSource: srv.wsConfig.Postgres.String(),
MaxOpenConns: srv.wsConfig.PostgresPool,
QueueSize: srv.wsConfig.ServerEventQueue,
}