13493: Merge branch 'master' into 13493-federation-proxy
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 16 Jul 2018 13:46:23 +0000 (09:46 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 16 Jul 2018 13:46:23 +0000 (09:46 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

18 files changed:
lib/controller/federation.go [new file with mode: 0644]
lib/controller/federation_test.go [new file with mode: 0644]
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/proxy.go [new file with mode: 0644]
lib/controller/server_test.go [new file with mode: 0644]
sdk/go/arvados/api_client_authorization.go
sdk/go/arvados/config.go
sdk/go/arvados/postgresql.go [new file with mode: 0644]
sdk/go/arvadostest/fixtures.go
sdk/go/auth/auth.go
sdk/go/auth/salt.go [new file with mode: 0644]
sdk/python/tests/run_test_server.py
services/api/app/controllers/static_controller.rb
services/ws/config.go
services/ws/event_source.go
services/ws/event_source_test.go
services/ws/server.go

diff --git a/lib/controller/federation.go b/lib/controller/federation.go
new file mode 100644 (file)
index 0000000..24b9250
--- /dev/null
@@ -0,0 +1,117 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "bytes"
+       "database/sql"
+       "io/ioutil"
+       "net/http"
+       "net/url"
+       "regexp"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+var wfRe = regexp.MustCompile(`^/arvados/v1/workflows/([0-9a-z]{5})-[^/]+$`)
+
+func (h *Handler) proxyRemoteCluster(w http.ResponseWriter, req *http.Request, next http.Handler) {
+       m := wfRe.FindStringSubmatch(req.URL.Path)
+       if len(m) < 2 || m[1] == h.Cluster.ClusterID {
+               next.ServeHTTP(w, req)
+               return
+       }
+       remoteID := m[1]
+       remote, ok := h.Cluster.RemoteClusters[remoteID]
+       if !ok {
+               httpserver.Error(w, "no proxy available for cluster "+remoteID, http.StatusNotFound)
+               return
+       }
+       scheme := remote.Scheme
+       if scheme == "" {
+               scheme = "https"
+       }
+       err := h.saltAuthToken(req, remoteID)
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusBadRequest)
+               return
+       }
+       urlOut := &url.URL{
+               Scheme:   scheme,
+               Host:     remote.Host,
+               Path:     req.URL.Path,
+               RawPath:  req.URL.RawPath,
+               RawQuery: req.URL.RawQuery,
+       }
+       client := h.secureClient
+       if remote.Insecure {
+               client = h.insecureClient
+       }
+       h.proxy.Do(w, req, urlOut, client)
+}
+
+// Extract the auth token supplied in req, and replace it with a
+// salted token for the remote cluster.
+func (h *Handler) saltAuthToken(req *http.Request, remote string) error {
+       creds := auth.NewCredentials()
+       creds.LoadTokensFromHTTPRequest(req)
+       if len(creds.Tokens) == 0 && req.Header.Get("Content-Type") == "application/x-www-form-encoded" {
+               // Override ParseForm's 10MiB limit by ensuring
+               // req.Body is a *http.maxBytesReader.
+               req.Body = http.MaxBytesReader(nil, req.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
+               if err := creds.LoadTokensFromHTTPRequestBody(req); err != nil {
+                       return err
+               }
+               // Replace req.Body with a buffer that re-encodes the
+               // form without api_token, in case we end up
+               // forwarding the request.
+               if req.PostForm != nil {
+                       req.PostForm.Del("api_token")
+               }
+               req.Body = ioutil.NopCloser(bytes.NewBufferString(req.PostForm.Encode()))
+       }
+       if len(creds.Tokens) == 0 {
+               return nil
+       }
+       token, err := auth.SaltToken(creds.Tokens[0], remote)
+       if err == auth.ErrObsoleteToken {
+               // If the token exists in our own database, salt it
+               // for the remote. Otherwise, assume it was issued by
+               // the remote, and pass it through unmodified.
+               db, err := h.db(req)
+               if err != nil {
+                       return err
+               }
+               aca := arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}
+               err = db.QueryRowContext(req.Context(), `SELECT uuid FROM api_client_authorizations WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, aca.APIToken).Scan(&aca.UUID)
+               if err == sql.ErrNoRows {
+                       // Not ours; pass through unmodified.
+                       token = aca.APIToken
+               } else if err != nil {
+                       return err
+               } else {
+                       // Found; make V2 version and salt it.
+                       token, err = auth.SaltToken(aca.TokenV2(), remote)
+                       if err != nil {
+                               return err
+                       }
+               }
+       } else if err != nil {
+               return err
+       }
+       req.Header.Set("Authorization", "Bearer "+token)
+
+       // Remove api_token=... from the the query string, in case we
+       // end up forwarding the request.
+       if values, err := url.ParseQuery(req.URL.RawQuery); err != nil {
+               return err
+       } else if _, ok := values["api_token"]; ok {
+               delete(values, "api_token")
+               req.URL.RawQuery = values.Encode()
+       }
+       return nil
+}
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
new file mode 100644 (file)
index 0000000..2682092
--- /dev/null
@@ -0,0 +1,301 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "encoding/json"
+       "io/ioutil"
+       "net/http"
+       "net/http/httptest"
+       "net/url"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+var _ = check.Suite(&FederationSuite{})
+
+type FederationSuite struct {
+       log *logrus.Logger
+       // testServer and testHandler are the controller being tested,
+       // "zhome".
+       testServer  *httpserver.Server
+       testHandler *Handler
+       // remoteServer ("zzzzz") forwards requests to the Rails API
+       // provided by the integration test environment.
+       remoteServer *httpserver.Server
+       // remoteMock ("zmock") appends each incoming request to
+       // remoteMockRequests, and returns an empty 200 response.
+       remoteMock         *httpserver.Server
+       remoteMockRequests []http.Request
+}
+
+func (s *FederationSuite) SetUpTest(c *check.C) {
+       s.log = logrus.New()
+       s.log.Formatter = &logrus.JSONFormatter{}
+       s.log.Out = &logWriter{c.Log}
+
+       s.remoteServer = newServerFromIntegrationTestEnv(c)
+       c.Assert(s.remoteServer.Start(), check.IsNil)
+
+       s.remoteMock = newServerFromIntegrationTestEnv(c)
+       s.remoteMock.Server.Handler = http.HandlerFunc(s.remoteMockHandler)
+       c.Assert(s.remoteMock.Start(), check.IsNil)
+
+       nodeProfile := arvados.NodeProfile{
+               Controller: arvados.SystemServiceInstance{Listen: ":"},
+               RailsAPI:   arvados.SystemServiceInstance{Listen: ":1"}, // local reqs will error "connection refused"
+       }
+       s.testHandler = &Handler{Cluster: &arvados.Cluster{
+               ClusterID:  "zhome",
+               PostgreSQL: integrationTestCluster().PostgreSQL,
+               NodeProfiles: map[string]arvados.NodeProfile{
+                       "*": nodeProfile,
+               },
+       }, NodeProfile: &nodeProfile}
+       s.testServer = newServerFromIntegrationTestEnv(c)
+       s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler))
+
+       s.testHandler.Cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+               "zzzzz": {
+                       Host:   s.remoteServer.Addr,
+                       Proxy:  true,
+                       Scheme: "http",
+               },
+               "zmock": {
+                       Host:   s.remoteMock.Addr,
+                       Proxy:  true,
+                       Scheme: "http",
+               },
+       }
+
+       c.Assert(s.testServer.Start(), check.IsNil)
+
+       s.remoteMockRequests = nil
+}
+
+func (s *FederationSuite) remoteMockHandler(w http.ResponseWriter, req *http.Request) {
+       s.remoteMockRequests = append(s.remoteMockRequests, *req)
+}
+
+func (s *FederationSuite) TearDownTest(c *check.C) {
+       if s.remoteServer != nil {
+               s.remoteServer.Close()
+       }
+       if s.testServer != nil {
+               s.testServer.Close()
+       }
+}
+
+func (s *FederationSuite) testRequest(req *http.Request) *http.Response {
+       resp := httptest.NewRecorder()
+       s.testServer.Server.Handler.ServeHTTP(resp, req)
+       return resp.Result()
+}
+
+func (s *FederationSuite) TestLocalRequest(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zhome-", 1), nil)
+       resp := s.testRequest(req)
+       s.checkHandledLocally(c, resp)
+}
+
+func (s *FederationSuite) checkHandledLocally(c *check.C, resp *http.Response) {
+       // Our "home" controller can't handle local requests because
+       // it doesn't have its own stub/test Rails API, so we rely on
+       // "connection refused" to indicate the controller tried to
+       // proxy the request to its local Rails API.
+       c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+       s.checkJSONErrorMatches(c, resp, `.*connection refused`)
+}
+
+func (s *FederationSuite) TestNoAuth(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+       s.checkJSONErrorMatches(c, resp, `Not logged in`)
+}
+
+func (s *FederationSuite) TestBadAuth(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+       req.Header.Set("Authorization", "Bearer aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+       s.checkJSONErrorMatches(c, resp, `Not logged in`)
+}
+
+func (s *FederationSuite) TestNoAccess(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.SpectatorToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+       s.checkJSONErrorMatches(c, resp, `.*not found`)
+}
+
+func (s *FederationSuite) TestGetUnknownRemote(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zz404-", 1), nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+       s.checkJSONErrorMatches(c, resp, `.*no proxy available for cluster zz404`)
+}
+
+func (s *FederationSuite) TestRemoteError(c *check.C) {
+       rc := s.testHandler.Cluster.RemoteClusters["zzzzz"]
+       rc.Scheme = "https"
+       s.testHandler.Cluster.RemoteClusters["zzzzz"] = rc
+
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+       s.checkJSONErrorMatches(c, resp, `.*HTTP response to HTTPS client`)
+}
+
+func (s *FederationSuite) TestGetRemoteWorkflow(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var wf arvados.Workflow
+       c.Check(json.NewDecoder(resp.Body).Decode(&wf), check.IsNil)
+       c.Check(wf.UUID, check.Equals, arvadostest.WorkflowWithDefinitionYAMLUUID)
+       c.Check(wf.OwnerUUID, check.Equals, arvadostest.ActiveUserUUID)
+}
+
+func (s *FederationSuite) TestOptionsMethod(c *check.C) {
+       req := httptest.NewRequest("OPTIONS", "/arvados/v1/workflows/"+arvadostest.WorkflowWithDefinitionYAMLUUID, nil)
+       req.Header.Set("Origin", "https://example.com")
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       body, err := ioutil.ReadAll(resp.Body)
+       c.Check(err, check.IsNil)
+       c.Check(string(body), check.Equals, "")
+       c.Check(resp.Header.Get("Access-Control-Allow-Origin"), check.Equals, "*")
+       for _, hdr := range []string{"Authorization", "Content-Type"} {
+               c.Check(resp.Header.Get("Access-Control-Allow-Headers"), check.Matches, ".*"+hdr+".*")
+       }
+       for _, method := range []string{"GET", "HEAD", "PUT", "POST", "DELETE"} {
+               c.Check(resp.Header.Get("Access-Control-Allow-Methods"), check.Matches, ".*"+method+".*")
+       }
+}
+
+func (s *FederationSuite) TestRemoteWithTokenInQuery(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1)+"?api_token="+arvadostest.ActiveToken, nil)
+       s.testRequest(req)
+       c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+       pr := s.remoteMockRequests[0]
+       // Token is salted and moved from query to Authorization header.
+       c.Check(pr.URL.String(), check.Not(check.Matches), `.*api_token=.*`)
+       c.Check(pr.Header.Get("Authorization"), check.Equals, "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/7fd31b61f39c0e82a4155592163218272cedacdc")
+}
+
+func (s *FederationSuite) TestLocalTokenSalted(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1), nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       s.testRequest(req)
+       c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+       pr := s.remoteMockRequests[0]
+       // The salted token here has a "zzzzz-" UUID instead of a
+       // "ztest-" UUID because ztest's local database has the
+       // "zzzzz-" test fixtures. The "secret" part is HMAC(sha1,
+       // arvadostest.ActiveToken, "zmock") = "7fd3...".
+       c.Check(pr.Header.Get("Authorization"), check.Equals, "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/7fd31b61f39c0e82a4155592163218272cedacdc")
+}
+
+func (s *FederationSuite) TestRemoteTokenNotSalted(c *check.C) {
+       // remoteToken can be any v1 token that doesn't appear in
+       // ztest's local db.
+       remoteToken := "abcdef00000000000000000000000000000000000000000000"
+       req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1), nil)
+       req.Header.Set("Authorization", "Bearer "+remoteToken)
+       s.testRequest(req)
+       c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+       pr := s.remoteMockRequests[0]
+       c.Check(pr.Header.Get("Authorization"), check.Equals, "Bearer "+remoteToken)
+}
+
+func (s *FederationSuite) TestWorkflowCRUD(c *check.C) {
+       wf := arvados.Workflow{
+               Description: "TestCRUD",
+       }
+       {
+               body := &strings.Builder{}
+               json.NewEncoder(body).Encode(&wf)
+               req := httptest.NewRequest("POST", "/arvados/v1/workflows", strings.NewReader(url.Values{
+                       "workflow": {body.String()},
+               }.Encode()))
+               req.Header.Set("Content-type", "application/x-www-form-urlencoded")
+               req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+               rec := httptest.NewRecorder()
+               s.remoteServer.Server.Handler.ServeHTTP(rec, req) // direct to remote -- can't proxy a create req because no uuid
+               resp := rec.Result()
+               s.checkResponseOK(c, resp)
+               json.NewDecoder(resp.Body).Decode(&wf)
+
+               defer func() {
+                       req := httptest.NewRequest("DELETE", "/arvados/v1/workflows/"+wf.UUID, nil)
+                       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+                       s.remoteServer.Server.Handler.ServeHTTP(httptest.NewRecorder(), req)
+               }()
+               c.Check(wf.UUID, check.Not(check.Equals), "")
+
+               c.Assert(wf.ModifiedAt, check.NotNil)
+               c.Logf("wf.ModifiedAt: %v", wf.ModifiedAt)
+               c.Check(time.Since(*wf.ModifiedAt) < time.Minute, check.Equals, true)
+       }
+       for _, method := range []string{"PATCH", "PUT", "POST"} {
+               form := url.Values{
+                       "workflow": {`{"description": "Updated with ` + method + `"}`},
+               }
+               if method == "POST" {
+                       form["_method"] = []string{"PATCH"}
+               }
+               req := httptest.NewRequest(method, "/arvados/v1/workflows/"+wf.UUID, strings.NewReader(form.Encode()))
+               req.Header.Set("Content-type", "application/x-www-form-urlencoded")
+               req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+               resp := s.testRequest(req)
+               s.checkResponseOK(c, resp)
+               err := json.NewDecoder(resp.Body).Decode(&wf)
+               c.Check(err, check.IsNil)
+
+               c.Check(wf.Description, check.Equals, "Updated with "+method)
+       }
+       {
+               req := httptest.NewRequest("DELETE", "/arvados/v1/workflows/"+wf.UUID, nil)
+               req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+               resp := s.testRequest(req)
+               s.checkResponseOK(c, resp)
+               err := json.NewDecoder(resp.Body).Decode(&wf)
+               c.Check(err, check.IsNil)
+       }
+       {
+               req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+wf.UUID, nil)
+               req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+               resp := s.testRequest(req)
+               c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+       }
+}
+
+func (s *FederationSuite) checkResponseOK(c *check.C, resp *http.Response) {
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       if resp.StatusCode != http.StatusOK {
+               body, err := ioutil.ReadAll(resp.Body)
+               c.Logf("... response body = %q, %v\n", body, err)
+       }
+}
+
+func (s *FederationSuite) checkJSONErrorMatches(c *check.C, resp *http.Response, re string) {
+       var jresp httpserver.ErrorResponse
+       err := json.NewDecoder(resp.Body).Decode(&jresp)
+       c.Check(err, check.IsNil)
+       c.Assert(len(jresp.Errors), check.Equals, 1)
+       c.Check(jresp.Errors[0], check.Matches, re)
+}
index a1a69a88e4ccd1bbb6d4882620581e91b3a03523..69b1866162c6fe1488ba3ca0b76d92817a4658ef 100644 (file)
@@ -5,8 +5,8 @@
 package controller
 
 import (
-       "context"
-       "io"
+       "database/sql"
+       "errors"
        "net"
        "net/http"
        "net/url"
@@ -17,15 +17,20 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       _ "github.com/lib/pq"
 )
 
 type Handler struct {
        Cluster     *arvados.Cluster
        NodeProfile *arvados.NodeProfile
 
-       setupOnce    sync.Once
-       handlerStack http.Handler
-       proxyClient  *arvados.Client
+       setupOnce      sync.Once
+       handlerStack   http.Handler
+       proxy          *proxy
+       secureClient   *http.Client
+       insecureClient *http.Client
+       pgdb           *sql.DB
+       pgdbMtx        sync.Mutex
 }
 
 func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -49,7 +54,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 
 func (h *Handler) CheckHealth() error {
        h.setupOnce.Do(h.setup)
-       _, err := findRailsAPI(h.Cluster, h.NodeProfile)
+       _, _, err := findRailsAPI(h.Cluster, h.NodeProfile)
        return err
 }
 
@@ -59,94 +64,87 @@ func (h *Handler) setup() {
                Token:  h.Cluster.ManagementToken,
                Prefix: "/_health/",
        })
-       mux.Handle("/", http.HandlerFunc(h.proxyRailsAPI))
+       hs := http.NotFoundHandler()
+       hs = prepend(hs, h.proxyRailsAPI)
+       hs = prepend(hs, h.proxyRemoteCluster)
+       mux.Handle("/", hs)
        h.handlerStack = mux
 
+       sc := *arvados.DefaultSecureClient
+       sc.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
+       h.secureClient = &sc
+
+       ic := *arvados.InsecureHTTPClient
+       ic.Timeout = time.Duration(h.Cluster.HTTPRequestTimeout)
+       h.insecureClient = &ic
+
+       h.proxy = &proxy{
+               Name:           "arvados-controller",
+               RequestTimeout: time.Duration(h.Cluster.HTTPRequestTimeout),
+       }
+
        // Changing the global isn't the right way to do this, but a
        // proper solution would conflict with an impending 13493
        // merge anyway, so this will do for now.
        arvados.InsecureHTTPClient.CheckRedirect = func(*http.Request, []*http.Request) error { return http.ErrUseLastResponse }
 }
 
-// headers that shouldn't be forwarded when proxying. See
-// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
-var dropHeaders = map[string]bool{
-       "Connection":          true,
-       "Keep-Alive":          true,
-       "Proxy-Authenticate":  true,
-       "Proxy-Authorization": true,
-       "TE":                true,
-       "Trailer":           true,
-       "Transfer-Encoding": true,
-       "Upgrade":           true,
-}
+var errDBConnection = errors.New("database connection error")
 
-func (h *Handler) proxyRailsAPI(w http.ResponseWriter, reqIn *http.Request) {
-       urlOut, err := findRailsAPI(h.Cluster, h.NodeProfile)
-       if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusInternalServerError)
-               return
-       }
-       urlOut = &url.URL{
-               Scheme:   urlOut.Scheme,
-               Host:     urlOut.Host,
-               Path:     reqIn.URL.Path,
-               RawPath:  reqIn.URL.RawPath,
-               RawQuery: reqIn.URL.RawQuery,
+func (h *Handler) db(req *http.Request) (*sql.DB, error) {
+       h.pgdbMtx.Lock()
+       defer h.pgdbMtx.Unlock()
+       if h.pgdb != nil {
+               return h.pgdb, nil
        }
 
-       // Copy headers from incoming request, then add/replace proxy
-       // headers like Via and X-Forwarded-For.
-       hdrOut := http.Header{}
-       for k, v := range reqIn.Header {
-               if !dropHeaders[k] {
-                       hdrOut[k] = v
-               }
+       db, err := sql.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
+       if err != nil {
+               httpserver.Logger(req).WithError(err).Error("postgresql connect failed")
+               return nil, errDBConnection
        }
-       xff := reqIn.RemoteAddr
-       if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
-               xff = xffIn + "," + xff
+       if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
+               db.SetMaxOpenConns(p)
        }
-       hdrOut.Set("X-Forwarded-For", xff)
-       if hdrOut.Get("X-Forwarded-Proto") == "" {
-               hdrOut.Set("X-Forwarded-Proto", reqIn.URL.Scheme)
+       if err := db.Ping(); err != nil {
+               httpserver.Logger(req).WithError(err).Error("postgresql connect succeeded but ping failed")
+               return nil, errDBConnection
        }
-       hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
+       h.pgdb = db
+       return db, nil
+}
 
-       ctx := reqIn.Context()
-       if timeout := h.Cluster.HTTPRequestTimeout; timeout > 0 {
-               var cancel context.CancelFunc
-               ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(timeout)))
-               defer cancel()
-       }
+type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
+
+func prepend(next http.Handler, middleware middlewareFunc) http.Handler {
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               middleware(w, req, next)
+       })
+}
 
-       reqOut := (&http.Request{
-               Method: reqIn.Method,
-               URL:    urlOut,
-               Host:   reqIn.Host,
-               Header: hdrOut,
-               Body:   reqIn.Body,
-       }).WithContext(ctx)
-       resp, err := arvados.InsecureHTTPClient.Do(reqOut)
+func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
+       urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
        if err != nil {
                httpserver.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
-       for k, v := range resp.Header {
-               for _, v := range v {
-                       w.Header().Add(k, v)
-               }
+       urlOut = &url.URL{
+               Scheme:   urlOut.Scheme,
+               Host:     urlOut.Host,
+               Path:     req.URL.Path,
+               RawPath:  req.URL.RawPath,
+               RawQuery: req.URL.RawQuery,
        }
-       w.WriteHeader(resp.StatusCode)
-       n, err := io.Copy(w, resp.Body)
-       if err != nil {
-               httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
+       client := h.secureClient
+       if insecure {
+               client = h.insecureClient
        }
+       h.proxy.Do(w, req, urlOut, client)
 }
 
 // For now, findRailsAPI always uses the rails API running on this
 // node.
-func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, error) {
+func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, bool, error) {
        hostport := np.RailsAPI.Listen
        if len(hostport) > 1 && hostport[0] == ':' && strings.TrimRight(hostport[1:], "0123456789") == "" {
                // ":12345" => connect to indicated port on localhost
@@ -154,11 +152,12 @@ func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL,
        } else if _, _, err := net.SplitHostPort(hostport); err == nil {
                // "[::1]:12345" => connect to indicated address & port
        } else {
-               return nil, err
+               return nil, false, err
        }
        proto := "http"
        if np.RailsAPI.TLS {
                proto = "https"
        }
-       return url.Parse(proto + "://" + hostport)
+       url, err := url.Parse(proto + "://" + hostport)
+       return url, np.RailsAPI.Insecure, err
 }
index eb947ea363705293679da1edd3430e2d8d5c0657..2f9280e11bc1a99f6822b3ef2c8a37d8133974b4 100644 (file)
@@ -34,11 +34,12 @@ type HandlerSuite struct {
 
 func (s *HandlerSuite) SetUpTest(c *check.C) {
        s.cluster = &arvados.Cluster{
-               ClusterID: "zzzzz",
+               ClusterID:  "zzzzz",
+               PostgreSQL: integrationTestCluster().PostgreSQL,
                NodeProfiles: map[string]arvados.NodeProfile{
                        "*": {
                                Controller: arvados.SystemServiceInstance{Listen: ":"},
-                               RailsAPI:   arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true},
+                               RailsAPI:   arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true, Insecure: true},
                        },
                },
        }
@@ -65,12 +66,12 @@ func (s *HandlerSuite) TestRequestTimeout(c *check.C) {
        req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
-       c.Check(resp.Code, check.Equals, http.StatusInternalServerError)
+       c.Check(resp.Code, check.Equals, http.StatusBadGateway)
        var jresp httpserver.ErrorResponse
        err := json.Unmarshal(resp.Body.Bytes(), &jresp)
        c.Check(err, check.IsNil)
        c.Assert(len(jresp.Errors), check.Equals, 1)
-       c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded`)
+       c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded.*`)
 }
 
 func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) {
@@ -101,6 +102,7 @@ func (s *HandlerSuite) TestProxyWithTokenInRequestBody(c *check.C) {
                "_method":   {"GET"},
                "api_token": {arvadostest.ActiveToken},
        }.Encode()))
+       req.Header.Set("Content-type", "application/x-www-form-urlencoded")
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
        c.Check(resp.Code, check.Equals, http.StatusOK)
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
new file mode 100644 (file)
index 0000000..1ff383d
--- /dev/null
@@ -0,0 +1,79 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "context"
+       "io"
+       "net/http"
+       "net/url"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+type proxy struct {
+       Name           string // to use in Via header
+       RequestTimeout time.Duration
+}
+
+// headers that shouldn't be forwarded when proxying. See
+// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
+var dropHeaders = map[string]bool{
+       "Connection":          true,
+       "Keep-Alive":          true,
+       "Proxy-Authenticate":  true,
+       "Proxy-Authorization": true,
+       "TE":                true,
+       "Trailer":           true,
+       "Transfer-Encoding": true,
+       "Upgrade":           true,
+}
+
+func (p *proxy) Do(w http.ResponseWriter, reqIn *http.Request, urlOut *url.URL, client *http.Client) {
+       // Copy headers from incoming request, then add/replace proxy
+       // headers like Via and X-Forwarded-For.
+       hdrOut := http.Header{}
+       for k, v := range reqIn.Header {
+               if !dropHeaders[k] {
+                       hdrOut[k] = v
+               }
+       }
+       xff := reqIn.RemoteAddr
+       if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
+               xff = xffIn + "," + xff
+       }
+       hdrOut.Set("X-Forwarded-For", xff)
+       hdrOut.Add("Via", reqIn.Proto+" arvados-controller")
+
+       ctx := reqIn.Context()
+       if p.RequestTimeout > 0 {
+               var cancel context.CancelFunc
+               ctx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(p.RequestTimeout)))
+               defer cancel()
+       }
+
+       reqOut := (&http.Request{
+               Method: reqIn.Method,
+               URL:    urlOut,
+               Header: hdrOut,
+               Body:   reqIn.Body,
+       }).WithContext(ctx)
+       resp, err := client.Do(reqOut)
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusBadGateway)
+               return
+       }
+       for k, v := range resp.Header {
+               for _, v := range v {
+                       w.Header().Add(k, v)
+               }
+       }
+       w.WriteHeader(resp.StatusCode)
+       n, err := io.Copy(w, resp.Body)
+       if err != nil {
+               httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
+       }
+}
diff --git a/lib/controller/server_test.go b/lib/controller/server_test.go
new file mode 100644 (file)
index 0000000..7742cf4
--- /dev/null
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "bytes"
+       "net/http"
+       "os"
+       "path/filepath"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+// logWriter is an io.Writer that writes by calling a "write log"
+// function, typically (*check.C)Log().
+type logWriter struct {
+       logfunc func(...interface{})
+}
+
+func (tl *logWriter) Write(buf []byte) (int, error) {
+       tl.logfunc(string(bytes.TrimRight(buf, "\n")))
+       return len(buf), nil
+}
+
+func integrationTestCluster() *arvados.Cluster {
+       cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+       if err != nil {
+               panic(err)
+       }
+       cc, err := cfg.GetCluster("zzzzz")
+       if err != nil {
+               panic(err)
+       }
+       return cc
+}
+
+// Return a new unstarted controller server, using the Rails API
+// provided by the integration-testing environment.
+func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
+       log := logrus.New()
+       log.Formatter = &logrus.JSONFormatter{}
+       log.Out = &logWriter{c.Log}
+
+       nodeProfile := arvados.NodeProfile{
+               Controller: arvados.SystemServiceInstance{Listen: ":"},
+               RailsAPI:   arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true, Insecure: true},
+       }
+       handler := &Handler{Cluster: &arvados.Cluster{
+               ClusterID:  "zzzzz",
+               PostgreSQL: integrationTestCluster().PostgreSQL,
+               NodeProfiles: map[string]arvados.NodeProfile{
+                       "*": nodeProfile,
+               },
+       }, NodeProfile: &nodeProfile}
+
+       srv := &httpserver.Server{
+               Server: http.Server{
+                       Handler: httpserver.AddRequestIDs(httpserver.LogRequests(log, handler)),
+               },
+               Addr: nodeProfile.Controller.Listen,
+       }
+       return srv
+}
index 3343bdb9aa0e4fcec1d478489dbbf387cc0ff1dd..ec0239eb37bf0a45bb715b35eab757c6c94850d5 100644 (file)
@@ -14,3 +14,7 @@ type APIClientAuthorization struct {
 type APIClientAuthorizationList struct {
        Items []APIClientAuthorization `json:"items"`
 }
+
+func (aca APIClientAuthorization) TokenV2() string {
+       return "v2/" + aca.UUID + "/" + aca.APIToken
+}
index 353901855683f296811a42e64b008568071dbdad..6edd18418bb8015087f8b486acf6ee21d2d26db4 100644 (file)
@@ -56,6 +56,27 @@ type Cluster struct {
        NodeProfiles       map[string]NodeProfile
        InstanceTypes      InstanceTypeMap
        HTTPRequestTimeout Duration
+       RemoteClusters     map[string]RemoteCluster
+       PostgreSQL         PostgreSQL
+}
+
+type PostgreSQL struct {
+       Connection     PostgreSQLConnection
+       ConnectionPool int
+}
+
+type PostgreSQLConnection map[string]string
+
+type RemoteCluster struct {
+       // API endpoint host or host:port; default is {id}.arvadosapi.com
+       Host string
+       // Perform a proxy request when a local client requests an
+       // object belonging to this remote.
+       Proxy bool
+       // Scheme, default "https". Can be set to "http" for testing.
+       Scheme string
+       // Disable TLS verify. Can be set to true for testing.
+       Insecure bool
 }
 
 type InstanceType struct {
@@ -172,6 +193,7 @@ func (np *NodeProfile) ServicePorts() map[ServiceName]string {
 }
 
 type SystemServiceInstance struct {
-       Listen string
-       TLS    bool
+       Listen   string
+       TLS      bool
+       Insecure bool
 }
diff --git a/sdk/go/arvados/postgresql.go b/sdk/go/arvados/postgresql.go
new file mode 100644 (file)
index 0000000..47953ce
--- /dev/null
@@ -0,0 +1,20 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import "strings"
+
+func (c PostgreSQLConnection) String() string {
+       s := ""
+       for k, v := range c {
+               s += strings.ToLower(k)
+               s += "='"
+               s += strings.Replace(
+                       strings.Replace(v, `\`, `\\`, -1),
+                       `'`, `\'`, -1)
+               s += "' "
+       }
+       return s
+}
index a434690775089c38a092499ae79f7fa0fcdec0e0..6a4b6232aceb82754dbee606504a8608ba96d054 100644 (file)
@@ -46,6 +46,8 @@ const (
 
        FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
        FooCollectionSharingToken     = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+
+       WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml"
 )
 
 // PathologicalManifest : A valid manifest designed to test
index ea492430e41297ddb8465c73b62c477e20af2357..ad1d398c763d7eaacefefcde8993e39044582f2a 100644 (file)
@@ -34,7 +34,7 @@ var EncodeTokenCookie func([]byte) string = base64.URLEncoding.EncodeToString
 // token.
 var DecodeTokenCookie func(string) ([]byte, error) = base64.URLEncoding.DecodeString
 
-// LoadTokensFromHttpRequest loads all tokens it can find in the
+// LoadTokensFromHTTPRequest loads all tokens it can find in the
 // headers and query string of an http query.
 func (a *Credentials) LoadTokensFromHTTPRequest(r *http.Request) {
        // Load plain token from "Authorization: OAuth2 ..." header
@@ -83,7 +83,21 @@ func (a *Credentials) loadTokenFromCookie(r *http.Request) {
        a.Tokens = append(a.Tokens, string(token))
 }
 
-// TODO: LoadTokensFromHttpRequestBody(). We can't assume in
-// LoadTokensFromHttpRequest() that [or how] we should read and parse
-// the request body. This has to be requested explicitly by the
-// application.
+// LoadTokensFromHTTPRequestBody() loads credentials from the request
+// body.
+//
+// This is separate from LoadTokensFromHTTPRequest() because it's not
+// always desirable to read the request body. This has to be requested
+// explicitly by the application.
+func (a *Credentials) LoadTokensFromHTTPRequestBody(r *http.Request) error {
+       if r.Header.Get("Content-Type") != "application/x-www-form-urlencoded" {
+               return nil
+       }
+       if err := r.ParseForm(); err != nil {
+               return err
+       }
+       if t := r.PostFormValue("api_token"); t != "" {
+               a.Tokens = append(a.Tokens, t)
+       }
+       return nil
+}
diff --git a/sdk/go/auth/salt.go b/sdk/go/auth/salt.go
new file mode 100644 (file)
index 0000000..667a30f
--- /dev/null
@@ -0,0 +1,48 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package auth
+
+import (
+       "crypto/hmac"
+       "crypto/sha1"
+       "errors"
+       "fmt"
+       "io"
+       "regexp"
+       "strings"
+)
+
+var (
+       reObsoleteToken  = regexp.MustCompile(`^[0-9a-z]{41,}$`)
+       ErrObsoleteToken = errors.New("obsolete token format")
+       ErrTokenFormat   = errors.New("badly formatted token")
+       ErrSalted        = errors.New("token already salted")
+)
+
+func SaltToken(token, remote string) (string, error) {
+       parts := strings.Split(token, "/")
+       if len(parts) < 3 || parts[0] != "v2" {
+               if reObsoleteToken.MatchString(token) {
+                       return "", ErrObsoleteToken
+               } else {
+                       return "", ErrTokenFormat
+               }
+       }
+       uuid := parts[1]
+       secret := parts[2]
+       if len(secret) != 40 {
+               // not already salted
+               hmac := hmac.New(sha1.New, []byte(secret))
+               io.WriteString(hmac, remote)
+               secret = fmt.Sprintf("%x", hmac.Sum(nil))
+               return "v2/" + uuid + "/" + secret, nil
+       } else if strings.HasPrefix(uuid, remote) {
+               // already salted for the desired remote
+               return token, nil
+       } else {
+               // salted for a different remote, can't be used
+               return "", ErrSalted
+       }
+}
index 102433cd4186fbf392d8f2fc56af804bdec4d890..8df95553d49e825db63286d9125077b6c53682a6 100644 (file)
@@ -413,6 +413,13 @@ def run_controller():
         f.write("""
 Clusters:
   zzzzz:
+    PostgreSQL:
+      ConnectionPool: 32
+      Connection:
+        host: {}
+        dbname: {}
+        user: {}
+        password: {}
     NodeProfiles:
       "*":
         "arvados-controller":
@@ -420,7 +427,15 @@ Clusters:
         "arvados-api-server":
           Listen: ":{}"
           TLS: true
-        """.format(port, rails_api_port))
+          Insecure: true
+        """.format(
+            _dbconfig('host'),
+            _dbconfig('database'),
+            _dbconfig('username'),
+            _dbconfig('password'),
+            port,
+            rails_api_port,
+        ))
     logf = open(_logfilename('controller'), 'a')
     controller = subprocess.Popen(
         ["arvados-server", "controller", "-config", conf],
index 594dc436297224463f860fe3d74cb9d718e02bdf..f0992c18314ac22bc60034855a204ab3aedce796 100644 (file)
@@ -25,7 +25,7 @@ class StaticController < ApplicationController
   end
 
   def empty
-    render text: "-"
+    render text: ""
   end
 
 end
index d4cf5114399576387eddd8f169005d5a142a8d0f..ead1ec20c6a1de471e82f2e82ad0c24b5e3a4b93 100644 (file)
@@ -12,7 +12,7 @@ import (
 
 type wsConfig struct {
        Client       arvados.Client
-       Postgres     pgConfig
+       Postgres     arvados.PostgreSQLConnection
        PostgresPool int
        Listen       string
        LogLevel     string
@@ -30,7 +30,7 @@ func defaultConfig() wsConfig {
                Client: arvados.Client{
                        APIHost: "localhost:443",
                },
-               Postgres: pgConfig{
+               Postgres: arvados.PostgreSQLConnection{
                        "dbname":                    "arvados_production",
                        "user":                      "arvados",
                        "password":                  "xyzzy",
index 9acfca50e4db639c04dda22a7040d2e91a1c1c4c..309dab7a403e54cc5cb24daaf312dc1b5baa72f2 100644 (file)
@@ -8,7 +8,6 @@ import (
        "context"
        "database/sql"
        "strconv"
-       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -17,21 +16,6 @@ import (
        "github.com/lib/pq"
 )
 
-type pgConfig map[string]string
-
-func (c pgConfig) ConnectionString() string {
-       s := ""
-       for k, v := range c {
-               s += k
-               s += "='"
-               s += strings.Replace(
-                       strings.Replace(v, `\`, `\\`, -1),
-                       `'`, `\'`, -1)
-               s += "' "
-       }
-       return s
-}
-
 type pgEventSource struct {
        DataSource   string
        MaxOpenConns int
index ea6063a0c3a718dde7baa52a7a9aa5504b0e5f16..ac5d130d61bdd85dfc568bf91c37b983994ae40c 100644 (file)
@@ -7,10 +7,12 @@ package main
 import (
        "database/sql"
        "fmt"
+       "os"
+       "path/filepath"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        check "gopkg.in/check.v1"
 )
 
@@ -18,30 +20,20 @@ var _ = check.Suite(&eventSourceSuite{})
 
 type eventSourceSuite struct{}
 
-func testDBConfig() pgConfig {
-       var railsDB struct {
-               Test struct {
-                       Database string
-                       Username string
-                       Password string
-                       Host     string
-               }
-       }
-       err := config.LoadFile(&railsDB, "../api/config/database.yml")
+func testDBConfig() arvados.PostgreSQLConnection {
+       cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
        if err != nil {
                panic(err)
        }
-       cfg := pgConfig{
-               "dbname":   railsDB.Test.Database,
-               "host":     railsDB.Test.Host,
-               "password": railsDB.Test.Password,
-               "user":     railsDB.Test.Username,
+       cc, err := cfg.GetCluster("zzzzz")
+       if err != nil {
+               panic(err)
        }
-       return cfg
+       return cc.PostgreSQL.Connection
 }
 
 func testDB() *sql.DB {
-       db, err := sql.Open("postgres", testDBConfig().ConnectionString())
+       db, err := sql.Open("postgres", testDBConfig().String())
        if err != nil {
                panic(err)
        }
@@ -52,7 +44,7 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
        cfg := testDBConfig()
        db := testDB()
        pges := &pgEventSource{
-               DataSource: cfg.ConnectionString(),
+               DataSource: cfg.String(),
                QueueSize:  4,
        }
        go pges.Run()
index 36ce7ae59f15cf9ec2a6fd1609aad1fdd2acd23d..eda7ff2a486a0f9ae59ddc12bf696e3e7a8059c5 100644 (file)
@@ -48,7 +48,7 @@ func (srv *server) setup() {
 
        srv.listener = ln
        srv.eventSource = &pgEventSource{
-               DataSource:   srv.wsConfig.Postgres.ConnectionString(),
+               DataSource:   srv.wsConfig.Postgres.String(),
                MaxOpenConns: srv.wsConfig.PostgresPool,
                QueueSize:    srv.wsConfig.ServerEventQueue,
        }