X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3b26754a231ec909506f2ff28af1af9f2e27f2b..ab42005d9a8a4bbfae9d96ce320662a958decfcc:/lib/controller/fed_collections.go diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go index b9cd205829..a0a123129f 100644 --- a/lib/controller/fed_collections.go +++ b/lib/controller/fed_collections.go @@ -17,16 +17,11 @@ import ( "strings" "sync" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/keepclient" ) -type collectionFederatedRequestHandler struct { - next http.Handler - handler *Handler -} - func rewriteSignatures(clusterID string, expectHash string, resp *http.Response, requestError error) (newResponse *http.Response, err error) { @@ -159,35 +154,52 @@ type searchRemoteClusterForPDH struct { statusCode *int } -func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.Method != "GET" { +func fetchRemoteCollectionByUUID( + h *genericFederatedRequestHandler, + effectiveMethod string, + clusterID *string, + uuid string, + remainder string, + w http.ResponseWriter, + req *http.Request) bool { + + if effectiveMethod != "GET" { // Only handle GET requests right now - h.next.ServeHTTP(w, req) - return + return false } - m := collectionByPDHRe.FindStringSubmatch(req.URL.Path) - if len(m) != 2 { - // Not a collection PDH GET request - m = collectionRe.FindStringSubmatch(req.URL.Path) - clusterId := "" - - if len(m) > 0 { - clusterId = m[2] - } - - if clusterId != "" && clusterId != h.handler.Cluster.ClusterID { + if uuid != "" { + // Collection UUID GET request + *clusterID = uuid[0:5] + if *clusterID != "" && *clusterID != h.handler.Cluster.ClusterID { // request for remote collection by uuid - resp, err := h.handler.remoteClusterRequest(clusterId, req) - newResponse, err := rewriteSignatures(clusterId, "", resp, err) + resp, err := h.handler.remoteClusterRequest(*clusterID, req) + newResponse, err := rewriteSignatures(*clusterID, "", resp, err) h.handler.proxy.ForwardResponse(w, newResponse, err) - return + return true } - // not a collection UUID request, or it is a request - // for a local UUID, either way, continue down the - // handler stack. - h.next.ServeHTTP(w, req) - return + } + + return false +} + +func fetchRemoteCollectionByPDH( + h *genericFederatedRequestHandler, + effectiveMethod string, + clusterID *string, + uuid string, + remainder string, + w http.ResponseWriter, + req *http.Request) bool { + + if effectiveMethod != "GET" { + // Only handle GET requests right now + return false + } + + m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path) + if len(m) != 2 { + return false } // Request for collection by PDH. Search the federation. @@ -197,7 +209,7 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req newResp, err := filterLocalClusterResponse(resp, err) if newResp != nil || err != nil { h.handler.proxy.ForwardResponse(w, newResp, err) - return + return true } // Create a goroutine for each cluster in the @@ -205,33 +217,31 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req // returned to the client. When that happens, all // other outstanding requests are cancelled sharedContext, cancelFunc := context.WithCancel(req.Context()) + defer cancelFunc() + req = req.WithContext(sharedContext) wg := sync.WaitGroup{} pdh := m[1] success := make(chan *http.Response) - errorChan := make(chan error) - - // use channel as a semaphore to limit the number of concurrent - // requests at a time - sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency()) + errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters)) - defer close(errorChan) - defer close(success) - defer close(sem) - defer cancelFunc() + acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification) for remoteID := range h.handler.Cluster.RemoteClusters { if remoteID == h.handler.Cluster.ClusterID { // No need to query local cluster again continue } + if remoteID == "*" { + // This isn't a real remote cluster: it just sets defaults for unlisted remotes. + continue + } wg.Add(1) go func(remote string) { defer wg.Done() - // blocks until it can put a value into the - // channel (which has a max queue capacity) - sem <- true + acquire() + defer release() select { case <-sharedContext.Done(): return @@ -269,7 +279,6 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req case success <- newResponse: wasSuccess = true } - <-sem }(remoteID) } go func() { @@ -277,24 +286,27 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req cancelFunc() }() - var errors []string errorCode := http.StatusNotFound for { select { case newResp = <-success: h.handler.proxy.ForwardResponse(w, newResp, nil) - return - case err := <-errorChan: - if httperr, ok := err.(HTTPError); ok { - if httperr.Code != http.StatusNotFound { + return true + case <-sharedContext.Done(): + var errors []string + for len(errorChan) > 0 { + err := <-errorChan + if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound { errorCode = http.StatusBadGateway } + errors = append(errors, err.Error()) } - errors = append(errors, err.Error()) - case <-sharedContext.Done(): httpserver.Errors(w, errors, errorCode) - return + return true } } + + // shouldn't ever get here + return true }