"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
-type collectionFederatedRequestHandler struct {
- next http.Handler
- handler *Handler
-}
-
func rewriteSignatures(clusterID string, expectHash string,
resp *http.Response, requestError error) (newResponse *http.Response, err error) {
statusCode *int
}
-func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- if req.Method != "GET" {
+func fetchRemoteCollectionByUUID(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool {
+
+ if effectiveMethod != "GET" {
// Only handle GET requests right now
- h.next.ServeHTTP(w, req)
- return
+ return false
}
- m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
- if len(m) != 2 {
- // Not a collection PDH GET request
- m = collectionRe.FindStringSubmatch(req.URL.Path)
- clusterId := ""
-
- if len(m) > 0 {
- clusterId = m[2]
- }
-
- if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
+ if uuid != "" {
+ // Collection UUID GET request
+ *clusterId = uuid[0:5]
+ if *clusterId != "" && *clusterId != h.handler.Cluster.ClusterID {
// request for remote collection by uuid
- resp, cancel, err := h.handler.remoteClusterRequest(clusterId, req)
- if cancel != nil {
- defer cancel()
- }
- newResponse, err := rewriteSignatures(clusterId, "", resp, err)
+ resp, err := h.handler.remoteClusterRequest(*clusterId, req)
+ newResponse, err := rewriteSignatures(*clusterId, "", resp, err)
h.handler.proxy.ForwardResponse(w, newResponse, err)
- return
+ return true
}
- // not a collection UUID request, or it is a request
- // for a local UUID, either way, continue down the
- // handler stack.
- h.next.ServeHTTP(w, req)
- return
+ }
+
+ return false
+}
+
+func fetchRemoteCollectionByPDH(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool {
+
+ if effectiveMethod != "GET" {
+ // Only handle GET requests right now
+ return false
+ }
+
+ m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path)
+ if len(m) != 2 {
+ return false
}
// Request for collection by PDH. Search the federation.
// First, query the local cluster.
- resp, localClusterRequestCancel, err := h.handler.localClusterRequest(req)
- if localClusterRequestCancel != nil {
- defer localClusterRequestCancel()
- }
+ resp, err := h.handler.localClusterRequest(req)
newResp, err := filterLocalClusterResponse(resp, err)
if newResp != nil || err != nil {
h.handler.proxy.ForwardResponse(w, newResp, err)
- return
+ return true
}
// Create a goroutine for each cluster in the
// returned to the client. When that happens, all
// other outstanding requests are cancelled
sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+
req = req.WithContext(sharedContext)
wg := sync.WaitGroup{}
pdh := m[1]
success := make(chan *http.Response)
- errorChan := make(chan error)
+ errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-
- defer close(errorChan)
- defer close(success)
- defer close(sem)
- defer cancelFunc()
+ acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
for remoteID := range h.handler.Cluster.RemoteClusters {
if remoteID == h.handler.Cluster.ClusterID {
// No need to query local cluster again
continue
}
+ if remoteID == "*" {
+ // This isn't a real remote cluster: it just sets defaults for unlisted remotes.
+ continue
+ }
wg.Add(1)
go func(remote string) {
defer wg.Done()
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
+ acquire()
+ defer release()
select {
case <-sharedContext.Done():
return
default:
}
- resp, _, err := h.handler.remoteClusterRequest(remote, req)
+ resp, err := h.handler.remoteClusterRequest(remote, req)
wasSuccess := false
defer func() {
if resp != nil && !wasSuccess {
resp.Body.Close()
}
}()
- // Don't need to do anything with the cancel
- // function returned by remoteClusterRequest
- // because the context inherits from
- // sharedContext, so when sharedContext is
- // cancelled it should cancel that one as
- // well.
if err != nil {
errorChan <- err
return
case success <- newResponse:
wasSuccess = true
}
- <-sem
}(remoteID)
}
go func() {
cancelFunc()
}()
- var errors []string
errorCode := http.StatusNotFound
for {
select {
case newResp = <-success:
h.handler.proxy.ForwardResponse(w, newResp, nil)
- return
- case err := <-errorChan:
- if httperr, ok := err.(HTTPError); ok {
- if httperr.Code != http.StatusNotFound {
+ return true
+ case <-sharedContext.Done():
+ var errors []string
+ for len(errorChan) > 0 {
+ err := <-errorChan
+ if httperr, ok := err.(HTTPError); !ok || httperr.Code != http.StatusNotFound {
errorCode = http.StatusBadGateway
}
+ errors = append(errors, err.Error())
}
- errors = append(errors, err.Error())
- case <-sharedContext.Done():
httpserver.Errors(w, errors, errorCode)
- return
+ return true
}
}
+
+ // shouldn't ever get here
+ return true
}