X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7c2aa951ed9e7d0010fd58d59dc1e98c9d5e2800..654560f15c684635f2331363f04ad4cb3e9663d6:/lib/controller/fed_collections.go diff --git a/lib/controller/fed_collections.go b/lib/controller/fed_collections.go index ab49e39d12..07daf2f90e 100644 --- a/lib/controller/fed_collections.go +++ b/lib/controller/fed_collections.go @@ -217,17 +217,15 @@ func fetchRemoteCollectionByPDH( // returned to the client. When that happens, all // other outstanding requests are cancelled sharedContext, cancelFunc := context.WithCancel(req.Context()) + defer cancelFunc() + req = req.WithContext(sharedContext) wg := sync.WaitGroup{} pdh := m[1] success := make(chan *http.Response) errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters)) - // use channel as a semaphore to limit the number of concurrent - // requests at a time - sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency()) - - defer cancelFunc() + acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification) for remoteID := range h.handler.Cluster.RemoteClusters { if remoteID == h.handler.Cluster.ClusterID { @@ -238,9 +236,8 @@ func fetchRemoteCollectionByPDH( wg.Add(1) go func(remote string) { defer wg.Done() - // blocks until it can put a value into the - // channel (which has a max queue capacity) - sem <- true + acquire() + defer release() select { case <-sharedContext.Done(): return @@ -278,7 +275,6 @@ func fetchRemoteCollectionByPDH( case success <- newResponse: wasSuccess = true } - <-sem }(remoteID) } go func() {