X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/66d07bb91f91b0cd4c92c7ffa913f7181a3d4942..c8828ded72f562811dfe29ca20809dd5641f7a1d:/lib/controller/fed_collections.go diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go index ab49e39d12..c33f5b2894 100644 --- a/lib/controller/fed_collections.go +++ b/lib/controller/fed_collections.go @@ -17,9 +17,9 @@ import ( "strings" "sync" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/keepclient" ) func rewriteSignatures(clusterID string, expectHash string, @@ -217,30 +217,31 @@ func fetchRemoteCollectionByPDH( // returned to the client. When that happens, all // other outstanding requests are cancelled sharedContext, cancelFunc := context.WithCancel(req.Context()) + defer cancelFunc() + req = req.WithContext(sharedContext) wg := sync.WaitGroup{} pdh := m[1] success := make(chan *http.Response) errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters)) - // use channel as a semaphore to limit the number of concurrent - // requests at a time - sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency()) - - defer cancelFunc() + acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification) for remoteID := range h.handler.Cluster.RemoteClusters { if remoteID == h.handler.Cluster.ClusterID { // No need to query local cluster again continue } + if remoteID == "*" { + // This isn't a real remote cluster: it just sets defaults for unlisted remotes. + continue + } wg.Add(1) go func(remote string) { defer wg.Done() - // blocks until it can put a value into the - // channel (which has a max queue capacity) - sem <- true + acquire() + defer release() select { case <-sharedContext.Done(): return @@ -278,7 +279,6 @@ func fetchRemoteCollectionByPDH( case success <- newResponse: wasSuccess = true } - <-sem }(remoteID) } go func() { @@ -297,10 +297,8 @@ func fetchRemoteCollectionByPDH( var errors []string for len(errorChan) > 0 { err := <-errorChan - if httperr, ok := err.(HTTPError); ok { - if httperr.Code != http.StatusNotFound { - errorCode = http.StatusBadGateway - } + if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound { + errorCode = http.StatusBadGateway } errors = append(errors, err.Error()) }