13993: Support for federated collection requests by uuid
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 7 Sep 2018 19:52:52 +0000 (15:52 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 7 Sep 2018 19:52:52 +0000 (15:52 -0400)
Do remote record retrevial using a salted token, update the block
signatures to "remote" signatures (changing +A to +R and adding the
remote cluster id), and return it to the original client.

Refactor routing and proxying of federated requeststo provide hooks to
support this and future types of requests.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

lib/controller/federation.go
lib/controller/federation_test.go
lib/controller/handler.go
lib/controller/proxy.go
lib/controller/shared.go [deleted file]

index c610a70ec7d0e7dbab97917126b7ee4907279199..3467a6de92916b35578194e54e2178f77312d4ac 100644 (file)
@@ -5,12 +5,16 @@
 package controller
 
 import (
+       "bufio"
        "bytes"
        "database/sql"
+       "encoding/json"
+       "fmt"
        "io/ioutil"
        "net/http"
        "net/url"
        "regexp"
+       "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
@@ -18,14 +22,19 @@ import (
 )
 
 var wfRe = regexp.MustCompile(`^/arvados/v1/workflows/([0-9a-z]{5})-[^/]+$`)
+var collectionRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-z]{5})-[^/]+$`)
 
-func (h *Handler) proxyRemoteCluster(w http.ResponseWriter, req *http.Request, next http.Handler) {
-       m := wfRe.FindStringSubmatch(req.URL.Path)
-       if len(m) < 2 || m[1] == h.Cluster.ClusterID {
-               next.ServeHTTP(w, req)
-               return
-       }
-       remoteID := m[1]
+type GenericFederatedRequestHandler struct {
+       next    http.Handler
+       handler *Handler
+}
+
+type CollectionFederatedRequestHandler struct {
+       next    http.Handler
+       handler *Handler
+}
+
+func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, req *http.Request, filter ResponseFilter) {
        remote, ok := h.Cluster.RemoteClusters[remoteID]
        if !ok {
                httpserver.Error(w, "no proxy available for cluster "+remoteID, http.StatusNotFound)
@@ -51,7 +60,98 @@ func (h *Handler) proxyRemoteCluster(w http.ResponseWriter, req *http.Request, n
        if remote.Insecure {
                client = h.insecureClient
        }
-       h.proxy.Do(w, req, urlOut, client)
+       h.proxy.Do(w, req, urlOut, client, filter)
+}
+
+func (h *GenericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       m := wfRe.FindStringSubmatch(req.URL.Path)
+       if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
+               h.next.ServeHTTP(w, req)
+               return
+       }
+       h.handler.remoteClusterRequest(m[1], w, req, nil)
+}
+
+var SignedLocatorPattern = regexp.MustCompile(
+       `^([0-9a-fA-F]{32}\+[0-9]+)((\+[B-Z][A-Za-z0-9@_-]*)*)(\+A[A-Za-z0-9@_-]*)((\+[B-Z][A-Za-z0-9@_-]*)*)$`)
+
+type rewriteSignaturesClusterId string
+
+func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response) (newResponse *http.Response, err error) {
+       if resp.StatusCode != 200 {
+               return resp, nil
+       }
+
+       originalBody := resp.Body
+       defer originalBody.Close()
+
+       var col arvados.Collection
+       err = json.NewDecoder(resp.Body).Decode(&col)
+       if err != nil {
+               return nil, err
+       }
+
+       // rewriting signatures will make manifest text 5-10% bigger so calculate
+       // capacity accordingly
+       updatedManifest := bytes.NewBuffer(make([]byte, 0, len(col.ManifestText)+(len(col.ManifestText)/10)))
+
+       scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
+       scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
+       for scanner.Scan() {
+               line := scanner.Text()
+               tokens := strings.Split(line, " ")
+               if len(tokens) < 3 {
+                       return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+               }
+
+               updatedManifest.WriteString(tokens[0])
+               for _, token := range tokens[1:] {
+                       updatedManifest.WriteString(" ")
+                       m := SignedLocatorPattern.FindStringSubmatch(token)
+                       if m != nil {
+                               // Rewrite the block signature to be a remote signature
+                               fmt.Fprintf(updatedManifest, "%s%s+R%s-%s%s", m[1], m[2], clusterId, m[4][2:], m[5])
+                       } else {
+                               updatedManifest.WriteString(token)
+                       }
+
+               }
+               updatedManifest.WriteString("\n")
+       }
+
+       col.ManifestText = updatedManifest.String()
+
+       newbody, err := json.Marshal(col)
+       if err != nil {
+               return nil, err
+       }
+
+       buf := bytes.NewBuffer(newbody)
+       resp.Body = ioutil.NopCloser(buf)
+       resp.ContentLength = int64(buf.Len())
+       resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+       return resp, nil
+}
+
+func (h *CollectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+       m := collectionRe.FindStringSubmatch(req.URL.Path)
+       if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
+               h.next.ServeHTTP(w, req)
+               return
+       }
+       h.handler.remoteClusterRequest(m[1], w, req,
+               rewriteSignaturesClusterId(m[1]).rewriteSignatures)
+}
+
+func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
+       mux := http.NewServeMux()
+       mux.Handle("/arvados/v1/workflows", next)
+       mux.Handle("/arvados/v1/workflows/", &GenericFederatedRequestHandler{next, h})
+       mux.Handle("/arvados/v1/collections/", &CollectionFederatedRequestHandler{next, h})
+       mux.Handle("/", next)
+
+       return mux
 }
 
 type CurrentUser struct {
index 268209280eaea8c5357e8550db1d3673e1d1baba..1b9bd11221654fe64fb76b20794aea44576f068e 100644 (file)
@@ -299,3 +299,25 @@ func (s *FederationSuite) checkJSONErrorMatches(c *check.C, resp *http.Response,
        c.Assert(len(jresp.Errors), check.Equals, 1)
        c.Check(jresp.Errors[0], check.Matches, re)
 }
+
+func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementCollection, nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var col arvados.Collection
+       c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+       c.Check(col.UUID, check.Equals, arvadostest.UserAgreementCollection)
+       c.Check(col.ManifestText, check.Matches,
+               `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+
+       // Confirm the regular expression identifies other groups of hints correctly
+       c.Check(SignedLocatorPattern.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b+D3+E4`),
+               check.DeepEquals,
+               []string{"6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b+D3+E4",
+                       "6a4ff0499484c6c79c95cd8c566bd25f+249025",
+                       "+B1+C2", "+C2",
+                       "+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b",
+                       "+D3+E4", "+E4"})
+}
index caa84a90cd683913a96d4398001a4923f9042d9e..2b41aba6bfabf2bf9a76d5b8c483d146eef5cc6a 100644 (file)
@@ -68,8 +68,7 @@ func (h *Handler) setup() {
        })
        hs := http.NotFoundHandler()
        hs = prepend(hs, h.proxyRailsAPI)
-       hs = prepend(hs, h.handleGoAPI)
-       hs = prepend(hs, h.proxyRemoteCluster)
+       hs = h.setupProxyRemoteCluster(hs)
        mux.Handle("/", hs)
        h.handlerStack = mux
 
@@ -139,7 +138,7 @@ func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next h
        if insecure {
                client = h.insecureClient
        }
-       h.proxy.Do(w, req, urlOut, client)
+       h.proxy.Do(w, req, urlOut, client, nil)
 }
 
 // For now, findRailsAPI always uses the rails API running on this
index 712071bef947bf02782427d4a739f3e74ee26c4c..8c643d752e6fe4d6266ec169d4ff84c3bcbf66d1 100644 (file)
@@ -32,7 +32,14 @@ var dropHeaders = map[string]bool{
        "Upgrade":           true,
 }
 
-func (p *proxy) Do(w http.ResponseWriter, reqIn *http.Request, urlOut *url.URL, client *http.Client) {
+type ResponseFilter func(*http.Response) (*http.Response, error)
+
+func (p *proxy) Do(w http.ResponseWriter,
+       reqIn *http.Request,
+       urlOut *url.URL,
+       client *http.Client,
+       filter ResponseFilter) {
+
        // Copy headers from incoming request, then add/replace proxy
        // headers like Via and X-Forwarded-For.
        hdrOut := http.Header{}
@@ -70,6 +77,30 @@ func (p *proxy) Do(w http.ResponseWriter, reqIn *http.Request, urlOut *url.URL,
                httpserver.Error(w, err.Error(), http.StatusBadGateway)
                return
        }
+
+       // make sure original response body gets closed
+       originalBody := resp.Body
+       defer originalBody.Close()
+
+       if filter != nil {
+               resp, err = filter(resp)
+
+               if err != nil {
+                       httpserver.Error(w, err.Error(), http.StatusBadGateway)
+                       return
+               }
+               if resp == nil {
+                       // filter() returned a nil response, this means suppress
+                       // writing a response, for the case where there might
+                       // be multiple response writers.
+                       return
+               }
+
+               // the filter gave us a new response body, make sure that gets closed too.
+               if resp.Body != originalBody {
+                       defer resp.Body.Close()
+               }
+       }
        for k, v := range resp.Header {
                for _, v := range v {
                        w.Header().Add(k, v)
diff --git a/lib/controller/shared.go b/lib/controller/shared.go
deleted file mode 100644 (file)
index 2d3ccc7..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-package controller
-
-import (
-       "database/sql"
-       "encoding/json"
-       "net/http"
-
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/auth"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-)
-
-func (h *Handler) groupsShared(w http.ResponseWriter, req *http.Request, currentUser CurrentUser) error {
-
-       db, err := h.db(req)
-       if err != nil {
-               return err
-       }
-
-       gl := arvados.GroupList{}
-
-       err = db.QueryRowContext(req.Context(), `SELECT count(uuid) from groups`).Scan(&gl.ItemsAvailable)
-       if err != nil {
-               return err
-       }
-
-       rows, err := db.QueryContext(req.Context(), `SELECT uuid, name, owner_uuid, group_class from groups limit 50`)
-       if err != nil {
-               return err
-       }
-
-       defer rows.Close()
-       for rows.Next() {
-               var g arvados.Group
-               rows.Scan(&g.UUID, &g.Name, &g.OwnerUUID, &g.GroupClass)
-               gl.Items = append(gl.Items, g)
-       }
-
-       enc := json.NewEncoder(w)
-       err = enc.Encode(gl)
-       if err != nil {
-               return err
-       }
-
-       return nil
-}
-
-func (h *Handler) handleGoAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
-       if req.URL.Path != "/arvados/v1/groups/shared" {
-               next.ServeHTTP(w, req)
-               return
-       }
-
-       // Check token and get user UUID
-
-       creds := auth.NewCredentials()
-       creds.LoadTokensFromHTTPRequest(req)
-
-       if len(creds.Tokens) == 0 {
-               httpserver.Error(w, "Not logged in", http.StatusForbidden)
-               return
-       }
-
-       currentUser := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}}
-       err := h.validateAPItoken(req, &currentUser)
-       if err != nil {
-               if err == sql.ErrNoRows {
-                       httpserver.Error(w, "Not logged in", http.StatusForbidden)
-               } else {
-                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
-               }
-               return
-       }
-
-       // Handle /arvados/v1/groups/shared
-
-       err = h.groupsShared(w, req, currentUser)
-       if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadRequest)
-       }
-}