import (
"bufio"
"bytes"
+ "context"
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"regexp"
"strings"
+ "sync"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
if remote.Insecure {
client = h.insecureClient
}
+ log.Printf("Remote cluster request to %v %v", remoteID, urlOut)
h.proxy.Do(w, req, urlOut, client, filter)
}
type rewriteSignaturesClusterId string
-func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response) (newResponse *http.Response, err error) {
+func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ return resp, requestError
+ }
+
if resp.StatusCode != 200 {
return resp, nil
}
}
type searchLocalClusterForPDH struct {
- needSearchFederation bool
+ sentResponse bool
}
-func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response) (newResponse *http.Response, err error) {
+func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ return resp, requestError
+ }
+
if resp.StatusCode == 404 {
// Suppress returning this result, because we want to
// search the federation.
- s.needSearchFederation = true
+ s.sentResponse = false
return nil, nil
}
+ s.sentResponse = true
return resp, nil
}
+type searchRemoteClusterForPDH struct {
+ remoteID string
+ mtx *sync.Mutex
+ sentResponse *bool
+ sharedContext *context.Context
+ cancelFunc func()
+ errors *[]string
+ statusCode *int
+}
+
+func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ s.mtx.Lock()
+ defer s.mtx.Unlock()
+
+ if *s.sentResponse {
+ // Another request already returned a response
+ return nil, nil
+ }
+
+ if requestError != nil {
+ *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
+ // Record the error and suppress response
+ return nil, nil
+ }
+
+ if resp.StatusCode != 200 {
+ // Suppress returning unsuccessful result. Maybe
+ // another request will find it.
+ // TODO collect and return error responses.
+ *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
+ if resp.StatusCode == 404 && (*s.statusCode == 0 || *s.statusCode == 404) {
+ // Only return 404 if every response is 404
+ *s.statusCode = http.StatusNotFound
+ } else {
+ // Got a non-404 error response, convert into BadGateway
+ *s.statusCode = http.StatusBadGateway
+ }
+ return nil, nil
+ }
+
+ s.mtx.Unlock()
+
+ // This reads the response body. We don't want to hold the
+ // lock while doing this because other remote requests could
+ // also have made it to this point, and we want don't want a
+ // slow response holding the lock to block a faster response
+ // that is waiting on the lock.
+ newResponse, err = rewriteSignaturesClusterId(s.remoteID).rewriteSignatures(resp, nil)
+
+ s.mtx.Lock()
+
+ if *s.sentResponse {
+ // Another request already returned a response
+ return nil, nil
+ }
+
+ if err != nil {
+ // Suppress returning unsuccessful result. Maybe
+ // another request will be successful.
+ *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
+ return nil, nil
+ }
+
+ // We have a successful response. Suppress/cancel all the
+ // other requests/responses.
+ *s.sentResponse = true
+ s.cancelFunc()
+
+ return newResponse, nil
+}
+
func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
if len(m) == 2 {
+ bearer := req.Header.Get("Authorization")
+ if strings.HasPrefix(bearer, "Bearer v2/") &&
+ len(bearer) > 10 &&
+ bearer[10:15] != h.handler.Cluster.ClusterID {
+ // Salted token from another cluster, just
+ // fall back to query local cluster only.
+ h.next.ServeHTTP(w, req)
+ return
+ }
+
urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusInternalServerError)
if insecure {
client = h.handler.insecureClient
}
- sf := &searchLocalClusterForPDH{false}
+ sf := &searchLocalClusterForPDH{}
h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
- if !sf.needSearchFederation {
- // a response was sent
+ if sf.sentResponse {
+ // a response was sent, nothing more to do
return
}
+
+ sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+ req = req.WithContext(sharedContext)
+
+ // Create a goroutine that will contact each cluster
+ // in the RemoteClusters map. The first one to return
+ // a valid result gets returned to the client. When
+ // that happens, all other outstanding requests are
+ // cancelled or suppressed.
+ sentResponse := false
+ mtx := sync.Mutex{}
+ wg := sync.WaitGroup{}
+ var errors []string
+ var errorCode int = 0
+ for remoteID := range h.handler.Cluster.RemoteClusters {
+ search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
+ &sharedContext, cancelFunc, &errors, &errorCode}
+ wg.Add(1)
+ go func() {
+ h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ if sentResponse {
+ return
+ }
+
+ if errorCode == 0 {
+ errorCode = http.StatusBadGateway
+ }
+
+ // No successful responses, so return an error
+ httpserver.Errors(w, errors, errorCode)
+ return
}
m = collectionRe.FindStringSubmatch(req.URL.Path)
`)
}
+func (s *FederationSuite) TestGetRemoteCollectionError(c *check.C) {
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/zzzzz-4zz18-fakefakefakefak", nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}
+
func (s *FederationSuite) TestSignedLocatorPattern(c *check.C) {
// Confirm the regular expression identifies other groups of hints correctly
c.Check(keepclient.SignedLocatorRe.FindStringSubmatch(`6a4ff0499484c6c79c95cd8c566bd25f+249025+B1+C2+A05227438989d04712ea9ca1c91b556cef01d5cc7@5ba5405b+D3+E4`),
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var col arvados.Collection
+ c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
+ c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
+ c.Check(col.ManifestText, check.Matches,
+ `\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+Rzzzzz-[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
+`)
+}
+
+func (s *FederationSuite) TestGetCollectionByPDHError(c *check.C) {
+ srv := &httpserver.Server{
+ Server: http.Server{
+ Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(404)
+ }),
+ },
+ }
+
+ c.Assert(srv.Start(), check.IsNil)
+ defer srv.Close()
+
+ // Make the "local" cluster to a service that always returns 404
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
+ TLS: false, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+
+ resp := s.testRequest(req)
+ defer resp.Body.Close()
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+}
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDH(c *check.C) {
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+ TLS: true, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
+ req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+ resp := s.testRequest(req)
+
c.Check(resp.StatusCode, check.Equals, http.StatusOK)
var col arvados.Collection
c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
`\. 6a4ff0499484c6c79c95cd8c566bd25f\+249025\+A[0-9a-f]{40}@[0-9a-f]{8} 0:249025:GNU_General_Public_License,_version_3.pdf
`)
}
+
+func (s *FederationSuite) TestSaltedTokenGetCollectionByPDHError(c *check.C) {
+ np := arvados.NodeProfile{
+ Controller: arvados.SystemServiceInstance{Listen: ":"},
+ RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
+ TLS: true, Insecure: true}}
+ s.testHandler.Cluster.NodeProfiles["*"] = np
+ s.testHandler.NodeProfile = &np
+
+ req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
+ req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
+ resp := s.testRequest(req)
+
+ c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
+}