17755: Merge branch 'main' into 17755-add-singularity-to-compute-image
[arvados.git] / lib / controller / fed_collections.go
index b9cd20582951505fe7b43c07c490be9507535720..a0a123129fdacdae34bf8b216d3e6b766a6f5889 100644 (file)
@@ -17,16 +17,11 @@ import (
        "strings"
        "sync"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
 )
 
-type collectionFederatedRequestHandler struct {
-       next    http.Handler
-       handler *Handler
-}
-
 func rewriteSignatures(clusterID string, expectHash string,
        resp *http.Response, requestError error) (newResponse *http.Response, err error) {
 
@@ -159,35 +154,52 @@ type searchRemoteClusterForPDH struct {
        statusCode    *int
 }
 
-func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-       if req.Method != "GET" {
+func fetchRemoteCollectionByUUID(
+       h *genericFederatedRequestHandler,
+       effectiveMethod string,
+       clusterID *string,
+       uuid string,
+       remainder string,
+       w http.ResponseWriter,
+       req *http.Request) bool {
+
+       if effectiveMethod != "GET" {
                // Only handle GET requests right now
-               h.next.ServeHTTP(w, req)
-               return
+               return false
        }
 
-       m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
-       if len(m) != 2 {
-               // Not a collection PDH GET request
-               m = collectionRe.FindStringSubmatch(req.URL.Path)
-               clusterId := ""
-
-               if len(m) > 0 {
-                       clusterId = m[2]
-               }
-
-               if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
+       if uuid != "" {
+               // Collection UUID GET request
+               *clusterID = uuid[0:5]
+               if *clusterID != "" && *clusterID != h.handler.Cluster.ClusterID {
                        // request for remote collection by uuid
-                       resp, err := h.handler.remoteClusterRequest(clusterId, req)
-                       newResponse, err := rewriteSignatures(clusterId, "", resp, err)
+                       resp, err := h.handler.remoteClusterRequest(*clusterID, req)
+                       newResponse, err := rewriteSignatures(*clusterID, "", resp, err)
                        h.handler.proxy.ForwardResponse(w, newResponse, err)
-                       return
+                       return true
                }
-               // not a collection UUID request, or it is a request
-               // for a local UUID, either way, continue down the
-               // handler stack.
-               h.next.ServeHTTP(w, req)
-               return
+       }
+
+       return false
+}
+
+func fetchRemoteCollectionByPDH(
+       h *genericFederatedRequestHandler,
+       effectiveMethod string,
+       clusterID *string,
+       uuid string,
+       remainder string,
+       w http.ResponseWriter,
+       req *http.Request) bool {
+
+       if effectiveMethod != "GET" {
+               // Only handle GET requests right now
+               return false
+       }
+
+       m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path)
+       if len(m) != 2 {
+               return false
        }
 
        // Request for collection by PDH.  Search the federation.
@@ -197,7 +209,7 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
        newResp, err := filterLocalClusterResponse(resp, err)
        if newResp != nil || err != nil {
                h.handler.proxy.ForwardResponse(w, newResp, err)
-               return
+               return true
        }
 
        // Create a goroutine for each cluster in the
@@ -205,33 +217,31 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
        // returned to the client.  When that happens, all
        // other outstanding requests are cancelled
        sharedContext, cancelFunc := context.WithCancel(req.Context())
+       defer cancelFunc()
+
        req = req.WithContext(sharedContext)
        wg := sync.WaitGroup{}
        pdh := m[1]
        success := make(chan *http.Response)
-       errorChan := make(chan error)
-
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+       errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
 
-       defer close(errorChan)
-       defer close(success)
-       defer close(sem)
-       defer cancelFunc()
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
 
        for remoteID := range h.handler.Cluster.RemoteClusters {
                if remoteID == h.handler.Cluster.ClusterID {
                        // No need to query local cluster again
                        continue
                }
+               if remoteID == "*" {
+                       // This isn't a real remote cluster: it just sets defaults for unlisted remotes.
+                       continue
+               }
 
                wg.Add(1)
                go func(remote string) {
                        defer wg.Done()
-                       // blocks until it can put a value into the
-                       // channel (which has a max queue capacity)
-                       sem <- true
+                       acquire()
+                       defer release()
                        select {
                        case <-sharedContext.Done():
                                return
@@ -269,7 +279,6 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
                        case success <- newResponse:
                                wasSuccess = true
                        }
-                       <-sem
                }(remoteID)
        }
        go func() {
@@ -277,24 +286,27 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
                cancelFunc()
        }()
 
-       var errors []string
        errorCode := http.StatusNotFound
 
        for {
                select {
                case newResp = <-success:
                        h.handler.proxy.ForwardResponse(w, newResp, nil)
-                       return
-               case err := <-errorChan:
-                       if httperr, ok := err.(HTTPError); ok {
-                               if httperr.Code != http.StatusNotFound {
+                       return true
+               case <-sharedContext.Done():
+                       var errors []string
+                       for len(errorChan) > 0 {
+                               err := <-errorChan
+                               if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound {
                                        errorCode = http.StatusBadGateway
                                }
+                               errors = append(errors, err.Error())
                        }
-                       errors = append(errors, err.Error())
-               case <-sharedContext.Done():
                        httpserver.Errors(w, errors, errorCode)
-                       return
+                       return true
                }
        }
+
+       // shouldn't ever get here
+       return true
 }