X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/23ddce7f83a4ab2e39b5910766f54aafb7b5a99d..40500de7ecb33a0f2197b2fe2449e6fc14b835a4:/lib/controller/fed_collections.go diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go index 62f98367cc..a0a123129f 100644 --- a/lib/controller/fed_collections.go +++ b/lib/controller/fed_collections.go @@ -17,16 +17,11 @@ import ( "strings" "sync" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/keepclient" ) -type collectionFederatedRequestHandler struct { - next http.Handler - handler *Handler -} - func rewriteSignatures(clusterID string, expectHash string, resp *http.Response, requestError error) (newResponse *http.Response, err error) { @@ -34,7 +29,7 @@ func rewriteSignatures(clusterID string, expectHash string, return resp, requestError } - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { return resp, nil } @@ -140,7 +135,7 @@ func filterLocalClusterResponse(resp *http.Response, requestError error) (newRes return resp, requestError } - if resp.StatusCode == 404 { + if resp.StatusCode == http.StatusNotFound { // Suppress returning this result, because we want to // search the federation. return nil, nil @@ -159,93 +154,52 @@ type searchRemoteClusterForPDH struct { statusCode *int } -func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) { - s.mtx.Lock() - defer s.mtx.Unlock() +func fetchRemoteCollectionByUUID( + h *genericFederatedRequestHandler, + effectiveMethod string, + clusterID *string, + uuid string, + remainder string, + w http.ResponseWriter, + req *http.Request) bool { - if *s.sentResponse { - // Another request already returned a response - return nil, nil - } - - if requestError != nil { - *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError)) - // Record the error and suppress response - return nil, nil + if effectiveMethod != "GET" { + // Only handle GET requests right now + return false } - if resp.StatusCode != 200 { - // Suppress returning unsuccessful result. Maybe - // another request will find it. - // TODO collect and return error responses. - *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status)) - if resp.StatusCode != 404 { - // Got a non-404 error response, convert into BadGateway - *s.statusCode = http.StatusBadGateway + if uuid != "" { + // Collection UUID GET request + *clusterID = uuid[0:5] + if *clusterID != "" && *clusterID != h.handler.Cluster.ClusterID { + // request for remote collection by uuid + resp, err := h.handler.remoteClusterRequest(*clusterID, req) + newResponse, err := rewriteSignatures(*clusterID, "", resp, err) + h.handler.proxy.ForwardResponse(w, newResponse, err) + return true } - return nil, nil } - s.mtx.Unlock() - - // This reads the response body. We don't want to hold the - // lock while doing this because other remote requests could - // also have made it to this point, and we don't want a - // slow response holding the lock to block a faster response - // that is waiting on the lock. - newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil) - - s.mtx.Lock() - - if *s.sentResponse { - // Another request already returned a response - return nil, nil - } - - if err != nil { - // Suppress returning unsuccessful result. Maybe - // another request will be successful. - *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err)) - return nil, nil - } - - // We have a successful response. Suppress/cancel all the - // other requests/responses. - *s.sentResponse = true - s.cancelFunc() - - return newResponse, nil + return false } -func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.Method != "GET" { +func fetchRemoteCollectionByPDH( + h *genericFederatedRequestHandler, + effectiveMethod string, + clusterID *string, + uuid string, + remainder string, + w http.ResponseWriter, + req *http.Request) bool { + + if effectiveMethod != "GET" { // Only handle GET requests right now - h.next.ServeHTTP(w, req) - return + return false } - m := collectionByPDHRe.FindStringSubmatch(req.URL.Path) + m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path) if len(m) != 2 { - // Not a collection PDH GET request - m = collectionRe.FindStringSubmatch(req.URL.Path) - clusterId := "" - - if len(m) > 0 { - clusterId = m[2] - } - - if clusterId != "" && clusterId != h.handler.Cluster.ClusterID { - // request for remote collection by uuid - resp, err := h.handler.remoteClusterRequest(clusterId, req) - newResponse, err := rewriteSignatures(clusterId, "", resp, err) - h.handler.proxy.ForwardResponse(w, newResponse, err) - return - } - // not a collection UUID request, or it is a request - // for a local UUID, either way, continue down the - // handler stack. - h.next.ServeHTTP(w, req) - return + return false } // Request for collection by PDH. Search the federation. @@ -255,58 +209,104 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req newResp, err := filterLocalClusterResponse(resp, err) if newResp != nil || err != nil { h.handler.proxy.ForwardResponse(w, newResp, err) - return + return true } - sharedContext, cancelFunc := context.WithCancel(req.Context()) - defer cancelFunc() - req = req.WithContext(sharedContext) - // Create a goroutine for each cluster in the // RemoteClusters map. The first valid result gets // returned to the client. When that happens, all - // other outstanding requests are cancelled or - // suppressed. - sentResponse := false - mtx := sync.Mutex{} + // other outstanding requests are cancelled + sharedContext, cancelFunc := context.WithCancel(req.Context()) + defer cancelFunc() + + req = req.WithContext(sharedContext) wg := sync.WaitGroup{} - var errors []string - var errorCode int = 404 + pdh := m[1] + success := make(chan *http.Response) + errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters)) + + acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification) - // use channel as a semaphore to limit the number of concurrent - // requests at a time - sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency()) - defer close(sem) for remoteID := range h.handler.Cluster.RemoteClusters { if remoteID == h.handler.Cluster.ClusterID { // No need to query local cluster again continue } - // blocks until it can put a value into the - // channel (which has a max queue capacity) - sem <- true - if sentResponse { - break + if remoteID == "*" { + // This isn't a real remote cluster: it just sets defaults for unlisted remotes. + continue } - search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse, - &sharedContext, cancelFunc, &errors, &errorCode} + wg.Add(1) - go func() { - resp, err := h.handler.remoteClusterRequest(search.remoteID, req) - newResp, err := search.filterRemoteClusterResponse(resp, err) - if newResp != nil || err != nil { - h.handler.proxy.ForwardResponse(w, newResp, err) + go func(remote string) { + defer wg.Done() + acquire() + defer release() + select { + case <-sharedContext.Done(): + return + default: } - wg.Done() - <-sem - }() - } - wg.Wait() - if sentResponse { - return + resp, err := h.handler.remoteClusterRequest(remote, req) + wasSuccess := false + defer func() { + if resp != nil && !wasSuccess { + resp.Body.Close() + } + }() + if err != nil { + errorChan <- err + return + } + if resp.StatusCode != http.StatusOK { + errorChan <- HTTPError{resp.Status, resp.StatusCode} + return + } + select { + case <-sharedContext.Done(): + return + default: + } + + newResponse, err := rewriteSignatures(remote, pdh, resp, nil) + if err != nil { + errorChan <- err + return + } + select { + case <-sharedContext.Done(): + case success <- newResponse: + wasSuccess = true + } + }(remoteID) + } + go func() { + wg.Wait() + cancelFunc() + }() + + errorCode := http.StatusNotFound + + for { + select { + case newResp = <-success: + h.handler.proxy.ForwardResponse(w, newResp, nil) + return true + case <-sharedContext.Done(): + var errors []string + for len(errorChan) > 0 { + err := <-errorChan + if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound { + errorCode = http.StatusBadGateway + } + errors = append(errors, err.Error()) + } + httpserver.Errors(w, errors, errorCode) + return true + } } - // No successful responses, so return the error - httpserver.Errors(w, errors, errorCode) + // shouldn't ever get here + return true }