"strings"
"sync"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
)
func rewriteSignatures(clusterID string, expectHash string,
func fetchRemoteCollectionByUUID(
h *genericFederatedRequestHandler,
effectiveMethod string,
- clusterId *string,
+ clusterID *string,
uuid string,
remainder string,
w http.ResponseWriter,
if uuid != "" {
// Collection UUID GET request
- *clusterId = uuid[0:5]
- if *clusterId != "" && *clusterId != h.handler.Cluster.ClusterID {
+ *clusterID = uuid[0:5]
+ if *clusterID != "" && *clusterID != h.handler.Cluster.ClusterID {
// request for remote collection by uuid
- resp, err := h.handler.remoteClusterRequest(*clusterId, req)
- newResponse, err := rewriteSignatures(*clusterId, "", resp, err)
+ resp, err := h.handler.remoteClusterRequest(*clusterID, req)
+ newResponse, err := rewriteSignatures(*clusterID, "", resp, err)
h.handler.proxy.ForwardResponse(w, newResponse, err)
return true
}
func fetchRemoteCollectionByPDH(
h *genericFederatedRequestHandler,
effectiveMethod string,
- clusterId *string,
+ clusterID *string,
uuid string,
remainder string,
w http.ResponseWriter,
// returned to the client. When that happens, all
// other outstanding requests are cancelled
sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+
req = req.WithContext(sharedContext)
wg := sync.WaitGroup{}
pdh := m[1]
success := make(chan *http.Response)
errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-
- defer cancelFunc()
+ acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
for remoteID := range h.handler.Cluster.RemoteClusters {
if remoteID == h.handler.Cluster.ClusterID {
// No need to query local cluster again
continue
}
+ if remoteID == "*" {
+ // This isn't a real remote cluster: it just sets defaults for unlisted remotes.
+ continue
+ }
wg.Add(1)
go func(remote string) {
defer wg.Done()
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
+ acquire()
+ defer release()
select {
case <-sharedContext.Done():
return
case success <- newResponse:
wasSuccess = true
}
- <-sem
}(remoteID)
}
go func() {
var errors []string
for len(errorChan) > 0 {
err := <-errorChan
- if httperr, ok := err.(HTTPError); ok {
- if httperr.Code != http.StatusNotFound {
- errorCode = http.StatusBadGateway
- }
+ if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound {
+ errorCode = http.StatusBadGateway
}
errors = append(errors, err.Error())
}