Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / lib / controller / fed_collections.go
index ab49e39d12656c3f960e840f82c9f4974e59d32d..c33f5b28946ab430e8532195c50c8ff8ac478506 100644 (file)
@@ -17,9 +17,9 @@ import (
        "strings"
        "sync"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
 )
 
 func rewriteSignatures(clusterID string, expectHash string,
@@ -217,30 +217,31 @@ func fetchRemoteCollectionByPDH(
        // returned to the client.  When that happens, all
        // other outstanding requests are cancelled
        sharedContext, cancelFunc := context.WithCancel(req.Context())
+       defer cancelFunc()
+
        req = req.WithContext(sharedContext)
        wg := sync.WaitGroup{}
        pdh := m[1]
        success := make(chan *http.Response)
        errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
 
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-
-       defer cancelFunc()
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
 
        for remoteID := range h.handler.Cluster.RemoteClusters {
                if remoteID == h.handler.Cluster.ClusterID {
                        // No need to query local cluster again
                        continue
                }
+               if remoteID == "*" {
+                       // This isn't a real remote cluster: it just sets defaults for unlisted remotes.
+                       continue
+               }
 
                wg.Add(1)
                go func(remote string) {
                        defer wg.Done()
-                       // blocks until it can put a value into the
-                       // channel (which has a max queue capacity)
-                       sem <- true
+                       acquire()
+                       defer release()
                        select {
                        case <-sharedContext.Done():
                                return
@@ -278,7 +279,6 @@ func fetchRemoteCollectionByPDH(
                        case success <- newResponse:
                                wasSuccess = true
                        }
-                       <-sem
                }(remoteID)
        }
        go func() {
@@ -297,10 +297,8 @@ func fetchRemoteCollectionByPDH(
                        var errors []string
                        for len(errorChan) > 0 {
                                err := <-errorChan
-                               if httperr, ok := err.(HTTPError); ok {
-                                       if httperr.Code != http.StatusNotFound {
-                                               errorCode = http.StatusBadGateway
-                                       }
+                               if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound {
+                                       errorCode = http.StatusBadGateway
                                }
                                errors = append(errors, err.Error())
                        }