14087: Fetch federated collection by PDH
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 10 Sep 2018 19:28:35 +0000 (15:28 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 11 Sep 2018 15:07:32 +0000 (11:07 -0400)
Implement parallel query of remotes.  First response is returned to
client, other responses in flight are cancelled.  Adds tests.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

lib/controller/federation.go
lib/controller/federation_test.go
lib/controller/proxy.go
sdk/go/httpserver/error.go

index d68f44d95b65ebd7afd2372c55a8b5fbf16d6cda..29280dac4b9b77db4416770a546e3ed7d25096cb 100644 (file)
@@ -7,14 +7,17 @@ package controller
 import (
        "bufio"
        "bytes"
+       "context"
        "database/sql"
        "encoding/json"
        "fmt"
        "io/ioutil"
+       "log"
        "net/http"
        "net/url"
        "regexp"
        "strings"
+       "sync"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
@@ -62,6 +65,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
        if remote.Insecure {
                client = h.insecureClient
        }
+       log.Printf("Remote cluster request to %v %v", remoteID, urlOut)
        h.proxy.Do(w, req, urlOut, client, filter)
 }
 
@@ -76,7 +80,11 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 
 type rewriteSignaturesClusterId string
 
-func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response) (newResponse *http.Response, err error) {
+func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+       if requestError != nil {
+               return resp, requestError
+       }
+
        if resp.StatusCode != 200 {
                return resp, nil
        }
@@ -134,22 +142,108 @@ func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Respons
 }
 
 type searchLocalClusterForPDH struct {
-       needSearchFederation bool
+       sentResponse bool
 }
 
-func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response) (newResponse *http.Response, err error) {
+func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+       if requestError != nil {
+               return resp, requestError
+       }
+
        if resp.StatusCode == 404 {
                // Suppress returning this result, because we want to
                // search the federation.
-               s.needSearchFederation = true
+               s.sentResponse = false
                return nil, nil
        }
+       s.sentResponse = true
        return resp, nil
 }
 
+type searchRemoteClusterForPDH struct {
+       remoteID      string
+       mtx           *sync.Mutex
+       sentResponse  *bool
+       sharedContext *context.Context
+       cancelFunc    func()
+       errors        *[]string
+       statusCode    *int
+}
+
+func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+       s.mtx.Lock()
+       defer s.mtx.Unlock()
+
+       if *s.sentResponse {
+               // Another request already returned a response
+               return nil, nil
+       }
+
+       if requestError != nil {
+               *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
+               // Record the error and suppress response
+               return nil, nil
+       }
+
+       if resp.StatusCode != 200 {
+               // Suppress returning unsuccessful result.  Maybe
+               // another request will find it.
+               // TODO collect and return error responses.
+               *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
+               if resp.StatusCode == 404 && (*s.statusCode == 0 || *s.statusCode == 404) {
+                       // Only return 404 if every response is 404
+                       *s.statusCode = http.StatusNotFound
+               } else {
+                       // Got a non-404 error response, convert into BadGateway
+                       *s.statusCode = http.StatusBadGateway
+               }
+               return nil, nil
+       }
+
+       s.mtx.Unlock()
+
+       // This reads the response body.  We don't want to hold the
+       // lock while doing this because other remote requests could
+       // also have made it to this point, and we want don't want a
+       // slow response holding the lock to block a faster response
+       // that is waiting on the lock.
+       newResponse, err = rewriteSignaturesClusterId(s.remoteID).rewriteSignatures(resp, nil)
+
+       s.mtx.Lock()
+
+       if *s.sentResponse {
+               // Another request already returned a response
+               return nil, nil
+       }
+
+       if err != nil {
+               // Suppress returning unsuccessful result.  Maybe
+               // another request will be successful.
+               *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
+               return nil, nil
+       }
+
+       // We have a successful response.  Suppress/cancel all the
+       // other requests/responses.
+       *s.sentResponse = true
+       s.cancelFunc()
+
+       return newResponse, nil
+}
+
 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
        if len(m) == 2 {
+               bearer := req.Header.Get("Authorization")
+               if strings.HasPrefix(bearer, "Bearer v2/") &&
+                       len(bearer) > 10 &&
+                       bearer[10:15] != h.handler.Cluster.ClusterID {
+                       // Salted token from another cluster, just
+                       // fall back to query local cluster only.
+                       h.next.ServeHTTP(w, req)
+                       return
+               }
+
                urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
                if err != nil {
                        httpserver.Error(w, err.Error(), http.StatusInternalServerError)
@@ -167,12 +261,48 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
                if insecure {
                        client = h.handler.insecureClient
                }
-               sf := &searchLocalClusterForPDH{false}
+               sf := &searchLocalClusterForPDH{}
                h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
-               if !sf.needSearchFederation {
-                       // a response was sent
+               if sf.sentResponse {
+                       // a response was sent, nothing more to do
                        return
                }
+
+               sharedContext, cancelFunc := context.WithCancel(req.Context())
+               defer cancelFunc()
+               req = req.WithContext(sharedContext)
+
+               // Create a goroutine that will contact each cluster
+               // in the RemoteClusters map.  The first one to return
+               // a valid result gets returned to the client.  When
+               // that happens, all other outstanding requests are
+               // cancelled or suppressed.
+               sentResponse := false
+               mtx := sync.Mutex{}
+               wg := sync.WaitGroup{}
+               var errors []string
+               var errorCode int = 0
+               for remoteID := range h.handler.Cluster.RemoteClusters {
+                       search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
+                               &sharedContext, cancelFunc, &errors, &errorCode}
+                       wg.Add(1)
+                       go func() {
+                               h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
+                               wg.Done()
+                       }()
+               }
+               wg.Wait()
+               if sentResponse {
+                       return
+               }
+
+               if errorCode == 0 {
+                       errorCode = http.StatusBadGateway
+               }
+
+               // No successful responses, so return an error
+               httpserver.Errors(w, errors, errorCode)
+               return
        }
 
        m = collectionRe.FindStringSubmatch(req.URL.Path)
index 53c6b9518a1f303396448b883299805419ab72e6..0cb784abf3d57a520baf0cb1e7fdb0c77f967b07 100644 (file)
@@ -337,6 +337,13 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
 `)
 }
 
+func (s *FederationSuite) TestGetRemoteCollectionError(c *check.C) {
+       req := httptest.NewRequest("GET", "/arvados/v1/collections/zzzzz-4zz18-fakefakefakefak", nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
+
 func (s *FederationSuite) TestSignedLocatorPattern(c *check.C) {
        // Confirm the regular expression identifies other groups of hints correctly
        c.Check(keepclient.SignedLocatorRe.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b+D3+E4`),
@@ -394,6 +401,56 @@ func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
        resp := s.testRequest(req)
 
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var col arvados.Collection
+       c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+       c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+       c.Check(col.ManifestText, check.Matches,
+               `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
+
+func (s *FederationSuite) TestGetCollectionByPDHError(c *check.C) {
+       srv := &httpserver.Server{
+               Server: http.Server{
+                       Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+                               w.WriteHeader(404)
+                       }),
+               },
+       }
+
+       c.Assert(srv.Start(), check.IsNil)
+       defer srv.Close()
+
+       // Make the "local" cluster to a service that always returns 404
+       np := arvados.NodeProfile{
+               Controller: arvados.SystemServiceInstance{Listen: ":"},
+               RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
+                       TLS: false, Insecure: true}}
+       s.testHandler.Cluster.NodeProfiles["*"] = np
+       s.testHandler.NodeProfile = &np
+
+       req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+
+       resp := s.testRequest(req)
+       defer resp.Body.Close()
+
+       c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+}
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDH(c *check.C) {
+       np := arvados.NodeProfile{
+               Controller: arvados.SystemServiceInstance{Listen: ":"},
+               RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+                       TLS: true, Insecure: true}}
+       s.testHandler.Cluster.NodeProfiles["*"] = np
+       s.testHandler.NodeProfile = &np
+
+       req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+       req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+       resp := s.testRequest(req)
+
        c.Check(resp.StatusCode, check.Equals, http.StatusOK)
        var col arvados.Collection
        c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
@@ -402,3 +459,18 @@ func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
                `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
 `)
 }
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDHError(c *check.C) {
+       np := arvados.NodeProfile{
+               Controller: arvados.SystemServiceInstance{Listen: ":"},
+               RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+                       TLS: true, Insecure: true}}
+       s.testHandler.Cluster.NodeProfiles["*"] = np
+       s.testHandler.NodeProfile = &np
+
+       req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+       req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+       resp := s.testRequest(req)
+
+       c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
index 2702c560736c6f56be3f5f9bc699f35701f02b81..72267b8d9a7750605f765e232883a8963192b8c0 100644 (file)
@@ -32,7 +32,7 @@ var dropHeaders = map[string]bool{
        "Upgrade":           true,
 }
 
-type ResponseFilter func(*http.Response) (*http.Response, error)
+type ResponseFilter func(*http.Response, error) (*http.Response, error)
 
 func (p *proxy) Do(w http.ResponseWriter,
        reqIn *http.Request,
@@ -74,17 +74,19 @@ func (p *proxy) Do(w http.ResponseWriter,
        }).WithContext(ctx)
 
        resp, err := client.Do(reqOut)
-       if err != nil {
+       if filter == nil && err != nil {
                httpserver.Error(w, err.Error(), http.StatusBadGateway)
                return
        }
 
        // make sure original response body gets closed
        originalBody := resp.Body
-       defer originalBody.Close()
+       if originalBody != nil {
+               defer originalBody.Close()
+       }
 
        if filter != nil {
-               resp, err = filter(resp)
+               resp, err = filter(resp, err)
 
                if err != nil {
                        httpserver.Error(w, err.Error(), http.StatusBadGateway)
@@ -102,6 +104,7 @@ func (p *proxy) Do(w http.ResponseWriter,
                        defer resp.Body.Close()
                }
        }
+
        for k, v := range resp.Header {
                for _, v := range v {
                        w.Header().Add(k, v)
index 398e61fcd081097fcdd3b1d79375d5f3347c09ae..1ccf8c04782fbf57aedfe6cb20f75c50ef53cb9d 100644 (file)
@@ -19,3 +19,10 @@ func Error(w http.ResponseWriter, error string, code int) {
        w.WriteHeader(code)
        json.NewEncoder(w).Encode(ErrorResponse{Errors: []string{error}})
 }
+
+func Errors(w http.ResponseWriter, errors []string, code int) {
+       w.Header().Set("Content-Type", "application/json")
+       w.Header().Set("X-Content-Type-Options", "nosniff")
+       w.WriteHeader(code)
+       json.NewEncoder(w).Encode(ErrorResponse{Errors: errors})
+}