X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/703179225b04309485c0a1cefb794df6c919e84f..654560f15c684635f2331363f04ad4cb3e9663d6:/lib/controller/fed_generic.go diff --git a/lib/controller/fed_generic.go b/lib/controller/fed_generic.go index 7d5b63d310..fd2fbc226e 100644 --- a/lib/controller/fed_generic.go +++ b/lib/controller/fed_generic.go @@ -6,7 +6,6 @@ package controller import ( "bytes" - "context" "encoding/json" "fmt" "io/ioutil" @@ -66,16 +65,12 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter, rc := multiClusterQueryResponseCollector{clusterID: clusterID} var resp *http.Response - var cancel context.CancelFunc if clusterID == h.handler.Cluster.ClusterID { - resp, cancel, err = h.handler.localClusterRequest(&remoteReq) + resp, err = h.handler.localClusterRequest(&remoteReq) } else { - resp, cancel, err = h.handler.remoteClusterRequest(clusterID, &remoteReq) + resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq) } rc.collectResponse(resp, err) - if cancel != nil { - cancel() - } if rc.error != nil { return nil, "", rc.error @@ -145,7 +140,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response if op == "in" { if rhs, ok := filter[2].([]interface{}); ok { for _, i := range rhs { - if u, ok := i.(string); ok { + if u, ok := i.(string); ok && len(u) == 27 { *clusterId = u[0:5] queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u) expectCount += 1 @@ -153,7 +148,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response } } } else if op == "=" { - if u, ok := filter[2].(string); ok { + if u, ok := filter[2].(string); ok && len(u) == 27 { *clusterId = u[0:5] queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u) expectCount += 1 @@ -180,9 +175,9 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest) return true } - if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() { + if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max { httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.", - expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest) + expectCount, max), http.StatusBadRequest) return true } if req.Form.Get("select") != "" { @@ -208,10 +203,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response // Perform concurrent requests to each cluster - // use channel as a semaphore to limit the number of concurrent - // requests at a time - sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency()) - defer close(sem) + acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification) wg := sync.WaitGroup{} req.Header.Set("Content-Type", "application/x-www-form-urlencoded") @@ -225,23 +217,20 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response // Nothing to query continue } - - // blocks until it can put a value into the - // channel (which has a max queue capacity) - sem <- true + acquire() wg.Add(1) go func(k string, v []string) { + defer release() + defer wg.Done() rp, kn, err := h.remoteQueryUUIDs(w, req, k, v) mtx.Lock() + defer mtx.Unlock() if err == nil { completeResponses = append(completeResponses, rp...) kind = kn } else { errors = append(errors, err) } - mtx.Unlock() - wg.Done() - <-sem }(k, v) } wg.Wait() @@ -300,8 +289,13 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h return } + var uuid string + if len(m[1]) > 0 { + // trim leading slash + uuid = m[1][1:] + } for _, d := range h.delegates { - if d(h, effectiveMethod, &clusterId, m[1], m[3], w, req) { + if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) { return } } @@ -309,10 +303,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h if clusterId == "" || clusterId == h.handler.Cluster.ClusterID { h.next.ServeHTTP(w, req) } else { - resp, cancel, err := h.handler.remoteClusterRequest(clusterId, req) - if cancel != nil { - defer cancel() - } + resp, err := h.handler.remoteClusterRequest(clusterId, req) h.handler.proxy.ForwardResponse(w, resp, err) } }