19146: Remove unneeded special case checks, explain the needed one.
[arvados.git] / lib / controller / fed_generic.go
index 63e61e6908f8b318ead4e151bd13dee302c815d3..fc2d96cc55fb5f4f0be7e46f55ee3f70445078a3 100644 (file)
@@ -14,13 +14,13 @@ import (
        "regexp"
        "sync"
 
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
 type federatedRequestDelegate func(
        h *genericFederatedRequestHandler,
        effectiveMethod string,
-       clusterId *string,
+       clusterID *string,
        uuid string,
        remainder string,
        w http.ResponseWriter,
@@ -38,12 +38,12 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
        clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
 
        found := make(map[string]bool)
-       prev_len_uuids := len(uuids) + 1
+       prevLenUuids := len(uuids) + 1
        // Loop while
        // (1) there are more uuids to query
        // (2) we're making progress - on each iteration the set of
        // uuids we are expecting for must shrink.
-       for len(uuids) > 0 && len(uuids) < prev_len_uuids {
+       for len(uuids) > 0 && len(uuids) < prevLenUuids {
                var remoteReq http.Request
                remoteReq.Header = req.Header
                remoteReq.Method = "POST"
@@ -103,7 +103,7 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
                                l = append(l, u)
                        }
                }
-               prev_len_uuids = len(uuids)
+               prevLenUuids = len(uuids)
                uuids = l
        }
 
@@ -111,7 +111,7 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
 }
 
 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
-       req *http.Request, clusterId *string) bool {
+       req *http.Request, clusterID *string) bool {
 
        var filters [][]interface{}
        err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
@@ -140,18 +140,18 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                if op == "in" {
                        if rhs, ok := filter[2].([]interface{}); ok {
                                for _, i := range rhs {
-                                       if u, ok := i.(string); ok {
-                                               *clusterId = u[0:5]
+                                       if u, ok := i.(string); ok && len(u) == 27 {
+                                               *clusterID = u[0:5]
                                                queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-                                               expectCount += 1
+                                               expectCount++
                                        }
                                }
                        }
                } else if op == "=" {
-                       if u, ok := filter[2].(string); ok {
-                               *clusterId = u[0:5]
+                       if u, ok := filter[2].(string); ok && len(u) == 27 {
+                               *clusterID = u[0:5]
                                queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-                               expectCount += 1
+                               expectCount++
                        }
                } else {
                        return false
@@ -175,9 +175,9 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
                return true
        }
-       if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+       if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max {
                httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-                       expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+                       expectCount, max), http.StatusBadRequest)
                return true
        }
        if req.Form.Get("select") != "" {
@@ -203,10 +203,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 
        // Perform concurrent requests to each cluster
 
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
        wg := sync.WaitGroup{}
 
        req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@@ -220,23 +217,20 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                        // Nothing to query
                        continue
                }
-
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
+               acquire()
                wg.Add(1)
                go func(k string, v []string) {
+                       defer release()
+                       defer wg.Done()
                        rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
                        mtx.Lock()
+                       defer mtx.Unlock()
                        if err == nil {
                                completeResponses = append(completeResponses, rp...)
                                kind = kn
                        } else {
                                errors = append(errors, err)
                        }
-                       mtx.Unlock()
-                       wg.Done()
-                       <-sem
                }(k, v)
        }
        wg.Wait()
@@ -262,10 +256,10 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 
 func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        m := h.matcher.FindStringSubmatch(req.URL.Path)
-       clusterId := ""
+       clusterID := ""
 
        if len(m) > 0 && m[2] != "" {
-               clusterId = m[2]
+               clusterID = m[2]
        }
 
        // Get form parameters from URL and form body (if POST).
@@ -276,7 +270,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
 
        // Check if the parameters have an explicit cluster_id
        if req.Form.Get("cluster_id") != "" {
-               clusterId = req.Form.Get("cluster_id")
+               clusterID = req.Form.Get("cluster_id")
        }
 
        // Handle the POST-as-GET special case (workaround for large
@@ -289,22 +283,27 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
        }
 
        if effectiveMethod == "GET" &&
-               clusterId == "" &&
+               clusterID == "" &&
                req.Form.Get("filters") != "" &&
-               h.handleMultiClusterQuery(w, req, &clusterId) {
+               h.handleMultiClusterQuery(w, req, &clusterID) {
                return
        }
 
+       var uuid string
+       if len(m[1]) > 0 {
+               // trim leading slash
+               uuid = m[1][1:]
+       }
        for _, d := range h.delegates {
-               if d(h, effectiveMethod, &clusterId, m[1], m[3], w, req) {
+               if d(h, effectiveMethod, &clusterID, uuid, m[3], w, req) {
                        return
                }
        }
 
-       if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
+       if clusterID == "" || clusterID == h.handler.Cluster.ClusterID {
                h.next.ServeHTTP(w, req)
        } else {
-               resp, err := h.handler.remoteClusterRequest(clusterId, req)
+               resp, err := h.handler.remoteClusterRequest(clusterID, req)
                h.handler.proxy.ForwardResponse(w, resp, err)
        }
 }