Merge branch 'master' into 14874-protected-collection-properties
[arvados.git] / lib / controller / fed_collections.go
index ab49e39d12656c3f960e840f82c9f4974e59d32d..07daf2f90ef28b3199e856c93134aa5b6975fab3 100644 (file)
@@ -217,17 +217,15 @@ func fetchRemoteCollectionByPDH(
        // returned to the client.  When that happens, all
        // other outstanding requests are cancelled
        sharedContext, cancelFunc := context.WithCancel(req.Context())
+       defer cancelFunc()
+
        req = req.WithContext(sharedContext)
        wg := sync.WaitGroup{}
        pdh := m[1]
        success := make(chan *http.Response)
        errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
 
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-
-       defer cancelFunc()
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
 
        for remoteID := range h.handler.Cluster.RemoteClusters {
                if remoteID == h.handler.Cluster.ClusterID {
@@ -238,9 +236,8 @@ func fetchRemoteCollectionByPDH(
                wg.Add(1)
                go func(remote string) {
                        defer wg.Done()
-                       // blocks until it can put a value into the
-                       // channel (which has a max queue capacity)
-                       sem <- true
+                       acquire()
+                       defer release()
                        select {
                        case <-sharedContext.Done():
                                return
@@ -278,7 +275,6 @@ func fetchRemoteCollectionByPDH(
                        case success <- newResponse:
                                wasSuccess = true
                        }
-                       <-sem
                }(remoteID)
        }
        go func() {