X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/16f704326f44fd1e5e5e60b936c9b5895d6a6ff8..d05b16ce4d4eea6841bf105081191f9964b485b7:/lib/controller/fed_generic.go diff --git a/lib/controller/fed_generic.go b/lib/controller/fed_generic.go index 9c8b1614bc..fc2d96cc55 100644 --- a/lib/controller/fed_generic.go +++ b/lib/controller/fed_generic.go @@ -14,13 +14,13 @@ import ( "regexp" "sync" - "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/httpserver" ) type federatedRequestDelegate func( h *genericFederatedRequestHandler, effectiveMethod string, - clusterId *string, + clusterID *string, uuid string, remainder string, w http.ResponseWriter, @@ -38,12 +38,12 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter, clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) { found := make(map[string]bool) - prev_len_uuids := len(uuids) + 1 + prevLenUuids := len(uuids) + 1 // Loop while // (1) there are more uuids to query // (2) we're making progress - on each iteration the set of // uuids we are expecting for must shrink. - for len(uuids) > 0 && len(uuids) < prev_len_uuids { + for len(uuids) > 0 && len(uuids) < prevLenUuids { var remoteReq http.Request remoteReq.Header = req.Header remoteReq.Method = "POST" @@ -103,7 +103,7 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter, l = append(l, u) } } - prev_len_uuids = len(uuids) + prevLenUuids = len(uuids) uuids = l } @@ -111,7 +111,7 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter, } func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, - req *http.Request, clusterId *string) bool { + req *http.Request, clusterID *string) bool { var filters [][]interface{} err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters) @@ -141,17 +141,17 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response if rhs, ok := filter[2].([]interface{}); ok { for _, i := range rhs { if u, ok := i.(string); ok && len(u) == 27 { - *clusterId = u[0:5] + *clusterID = u[0:5] queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u) - expectCount += 1 + expectCount++ } } } } else if op == "=" { if u, ok := filter[2].(string); ok && len(u) == 27 { - *clusterId = u[0:5] + *clusterID = u[0:5] queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u) - expectCount += 1 + expectCount++ } } else { return false @@ -175,9 +175,9 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest) return true } - if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() { + if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max { httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.", - expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest) + expectCount, max), http.StatusBadRequest) return true } if req.Form.Get("select") != "" { @@ -203,10 +203,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response // Perform concurrent requests to each cluster - // use channel as a semaphore to limit the number of concurrent - // requests at a time - sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency()) - defer close(sem) + acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification) wg := sync.WaitGroup{} req.Header.Set("Content-Type", "application/x-www-form-urlencoded") @@ -220,23 +217,20 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response // Nothing to query continue } - - // blocks until it can put a value into the - // channel (which has a max queue capacity) - sem <- true + acquire() wg.Add(1) go func(k string, v []string) { + defer release() + defer wg.Done() rp, kn, err := h.remoteQueryUUIDs(w, req, k, v) mtx.Lock() + defer mtx.Unlock() if err == nil { completeResponses = append(completeResponses, rp...) kind = kn } else { errors = append(errors, err) } - mtx.Unlock() - wg.Done() - <-sem }(k, v) } wg.Wait() @@ -262,10 +256,10 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { m := h.matcher.FindStringSubmatch(req.URL.Path) - clusterId := "" + clusterID := "" if len(m) > 0 && m[2] != "" { - clusterId = m[2] + clusterID = m[2] } // Get form parameters from URL and form body (if POST). @@ -276,7 +270,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h // Check if the parameters have an explicit cluster_id if req.Form.Get("cluster_id") != "" { - clusterId = req.Form.Get("cluster_id") + clusterID = req.Form.Get("cluster_id") } // Handle the POST-as-GET special case (workaround for large @@ -289,9 +283,9 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h } if effectiveMethod == "GET" && - clusterId == "" && + clusterID == "" && req.Form.Get("filters") != "" && - h.handleMultiClusterQuery(w, req, &clusterId) { + h.handleMultiClusterQuery(w, req, &clusterID) { return } @@ -301,15 +295,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h uuid = m[1][1:] } for _, d := range h.delegates { - if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) { + if d(h, effectiveMethod, &clusterID, uuid, m[3], w, req) { return } } - if clusterId == "" || clusterId == h.handler.Cluster.ClusterID { + if clusterID == "" || clusterID == h.handler.Cluster.ClusterID { h.next.ServeHTTP(w, req) } else { - resp, err := h.handler.remoteClusterRequest(clusterId, req) + resp, err := h.handler.remoteClusterRequest(clusterID, req) h.handler.proxy.ForwardResponse(w, resp, err) } }