package controller
import (
+ "bufio"
"bytes"
"database/sql"
+ "encoding/json"
+ "fmt"
"io/ioutil"
"net/http"
"net/url"
"regexp"
+ "strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
)
var wfRe = regexp.MustCompile(`^/arvados/v1/workflows/([0-9a-z]{5})-[^/]+$`)
+var collectionRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-z]{5})-[^/]+$`)
-func (h *Handler) proxyRemoteCluster(w http.ResponseWriter, req *http.Request, next http.Handler) {
- m := wfRe.FindStringSubmatch(req.URL.Path)
- if len(m) < 2 || m[1] == h.Cluster.ClusterID {
- next.ServeHTTP(w, req)
- return
- }
- remoteID := m[1]
+type GenericFederatedRequestHandler struct {
+ next http.Handler
+ handler *Handler
+}
+
+type CollectionFederatedRequestHandler struct {
+ next http.Handler
+ handler *Handler
+}
+
+func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, req *http.Request, filter ResponseFilter) {
remote, ok := h.Cluster.RemoteClusters[remoteID]
if !ok {
httpserver.Error(w, "no proxy available for cluster "+remoteID, http.StatusNotFound)
if remote.Insecure {
client = h.insecureClient
}
- h.proxy.Do(w, req, urlOut, client)
+ h.proxy.Do(w, req, urlOut, client, filter)
+}
+
+func (h *GenericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ m := wfRe.FindStringSubmatch(req.URL.Path)
+ if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
+ h.next.ServeHTTP(w, req)
+ return
+ }
+ h.handler.remoteClusterRequest(m[1], w, req, nil)
+}
+
+var SignedLocatorPattern = regexp.MustCompile(
+ `^([0-9a-fA-F]{32}\+[0-9]+)((\+[B-Z][A-Za-z0-9@_-]*)*)(\+A[A-Za-z0-9@_-]*)((\+[B-Z][A-Za-z0-9@_-]*)*)$`)
+
+type rewriteSignaturesClusterId string
+
+func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response) (newResponse *http.Response, err error) {
+ if resp.StatusCode != 200 {
+ return resp, nil
+ }
+
+ originalBody := resp.Body
+ defer originalBody.Close()
+
+ var col arvados.Collection
+ err = json.NewDecoder(resp.Body).Decode(&col)
+ if err != nil {
+ return nil, err
+ }
+
+ // rewriting signatures will make manifest text 5-10% bigger so calculate
+ // capacity accordingly
+ updatedManifest := bytes.NewBuffer(make([]byte, 0, len(col.ManifestText)+(len(col.ManifestText)/10)))
+
+ scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
+ scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
+ for scanner.Scan() {
+ line := scanner.Text()
+ tokens := strings.Split(line, " ")
+ if len(tokens) < 3 {
+ return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
+ }
+
+ updatedManifest.WriteString(tokens[0])
+ for _, token := range tokens[1:] {
+ updatedManifest.WriteString(" ")
+ m := SignedLocatorPattern.FindStringSubmatch(token)
+ if m != nil {
+ // Rewrite the block signature to be a remote signature
+ fmt.Fprintf(updatedManifest, "%s%s+R%s-%s%s", m[1], m[2], clusterId, m[4][2:], m[5])
+ } else {
+ updatedManifest.WriteString(token)
+ }
+
+ }
+ updatedManifest.WriteString("\n")
+ }
+
+ col.ManifestText = updatedManifest.String()
+
+ newbody, err := json.Marshal(col)
+ if err != nil {
+ return nil, err
+ }
+
+ buf := bytes.NewBuffer(newbody)
+ resp.Body = ioutil.NopCloser(buf)
+ resp.ContentLength = int64(buf.Len())
+ resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
+
+ return resp, nil
+}
+
+func (h *CollectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ m := collectionRe.FindStringSubmatch(req.URL.Path)
+ if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
+ h.next.ServeHTTP(w, req)
+ return
+ }
+ h.handler.remoteClusterRequest(m[1], w, req,
+ rewriteSignaturesClusterId(m[1]).rewriteSignatures)
+}
+
+func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
+ mux := http.NewServeMux()
+ mux.Handle("/arvados/v1/workflows", next)
+ mux.Handle("/arvados/v1/workflows/", &GenericFederatedRequestHandler{next, h})
+ mux.Handle("/arvados/v1/collections/", &CollectionFederatedRequestHandler{next, h})
+ mux.Handle("/", next)
+
+ return mux
}
type CurrentUser struct {
c.Assert(len(jresp.Errors), check.Equals, 1)
c.Check(jresp.Errors[0], check.Matches, re)
}
+
+func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementCollection, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var col arvados.Collection
+ c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+ c.Check(col.UUID, check.Equals, arvadostest.UserAgreementCollection)
+ c.Check(col.ManifestText, check.Matches,
+ `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+
+ // Confirm the regular expression identifies other groups of hints correctly
+ c.Check(SignedLocatorPattern.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b+D3+E4`),
+ check.DeepEquals,
+ []string{"6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b+D3+E4",
+ "6a4ff0499484c6c79c95cd8c566bd25f+249025",
+ "+B1+C2", "+C2",
+ "+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b",
+ "+D3+E4", "+E4"})
+}
})
hs := http.NotFoundHandler()
hs = prepend(hs, h.proxyRailsAPI)
- hs = prepend(hs, h.handleGoAPI)
- hs = prepend(hs, h.proxyRemoteCluster)
+ hs = h.setupProxyRemoteCluster(hs)
mux.Handle("/", hs)
h.handlerStack = mux
if insecure {
client = h.insecureClient
}
- h.proxy.Do(w, req, urlOut, client)
+ h.proxy.Do(w, req, urlOut, client, nil)
}
// For now, findRailsAPI always uses the rails API running on this
"Upgrade": true,
}
-func (p *proxy) Do(w http.ResponseWriter, reqIn *http.Request, urlOut *url.URL, client *http.Client) {
+type ResponseFilter func(*http.Response) (*http.Response, error)
+
+func (p *proxy) Do(w http.ResponseWriter,
+ reqIn *http.Request,
+ urlOut *url.URL,
+ client *http.Client,
+ filter ResponseFilter) {
+
// Copy headers from incoming request, then add/replace proxy
// headers like Via and X-Forwarded-For.
hdrOut := http.Header{}
httpserver.Error(w, err.Error(), http.StatusBadGateway)
return
}
+
+ // make sure original response body gets closed
+ originalBody := resp.Body
+ defer originalBody.Close()
+
+ if filter != nil {
+ resp, err = filter(resp)
+
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadGateway)
+ return
+ }
+ if resp == nil {
+ // filter() returned a nil response, this means suppress
+ // writing a response, for the case where there might
+ // be multiple response writers.
+ return
+ }
+
+ // the filter gave us a new response body, make sure that gets closed too.
+ if resp.Body != originalBody {
+ defer resp.Body.Close()
+ }
+ }
for k, v := range resp.Header {
for _, v := range v {
w.Header().Add(k, v)
+++ /dev/null
-package controller
-
-import (
- "database/sql"
- "encoding/json"
- "net/http"
-
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/auth"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
-)
-
-func (h *Handler) groupsShared(w http.ResponseWriter, req *http.Request, currentUser CurrentUser) error {
-
- db, err := h.db(req)
- if err != nil {
- return err
- }
-
- gl := arvados.GroupList{}
-
- err = db.QueryRowContext(req.Context(), `SELECT count(uuid) from groups`).Scan(&gl.ItemsAvailable)
- if err != nil {
- return err
- }
-
- rows, err := db.QueryContext(req.Context(), `SELECT uuid, name, owner_uuid, group_class from groups limit 50`)
- if err != nil {
- return err
- }
-
- defer rows.Close()
- for rows.Next() {
- var g arvados.Group
- rows.Scan(&g.UUID, &g.Name, &g.OwnerUUID, &g.GroupClass)
- gl.Items = append(gl.Items, g)
- }
-
- enc := json.NewEncoder(w)
- err = enc.Encode(gl)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (h *Handler) handleGoAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
- if req.URL.Path != "/arvados/v1/groups/shared" {
- next.ServeHTTP(w, req)
- return
- }
-
- // Check token and get user UUID
-
- creds := auth.NewCredentials()
- creds.LoadTokensFromHTTPRequest(req)
-
- if len(creds.Tokens) == 0 {
- httpserver.Error(w, "Not logged in", http.StatusForbidden)
- return
- }
-
- currentUser := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}}
- err := h.validateAPItoken(req, ¤tUser)
- if err != nil {
- if err == sql.ErrNoRows {
- httpserver.Error(w, "Not logged in", http.StatusForbidden)
- } else {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- }
- return
- }
-
- // Handle /arvados/v1/groups/shared
-
- err = h.groupsShared(w, req, currentUser)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- }
-}