Merge branch '16217-ws-ping'
[arvados.git] / lib / controller / fed_generic.go
index 63e61e6908f8b318ead4e151bd13dee302c815d3..476fd97b05cd1c8a10ded9aaf43ef6b21744443c 100644 (file)
@@ -14,7 +14,7 @@ import (
        "regexp"
        "sync"
 
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
 type federatedRequestDelegate func(
@@ -140,7 +140,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                if op == "in" {
                        if rhs, ok := filter[2].([]interface{}); ok {
                                for _, i := range rhs {
-                                       if u, ok := i.(string); ok {
+                                       if u, ok := i.(string); ok && len(u) == 27 {
                                                *clusterId = u[0:5]
                                                queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
                                                expectCount += 1
@@ -148,7 +148,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                                }
                        }
                } else if op == "=" {
-                       if u, ok := filter[2].(string); ok {
+                       if u, ok := filter[2].(string); ok && len(u) == 27 {
                                *clusterId = u[0:5]
                                queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
                                expectCount += 1
@@ -175,9 +175,9 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
                return true
        }
-       if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+       if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max {
                httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-                       expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+                       expectCount, max), http.StatusBadRequest)
                return true
        }
        if req.Form.Get("select") != "" {
@@ -203,10 +203,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 
        // Perform concurrent requests to each cluster
 
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
        wg := sync.WaitGroup{}
 
        req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@@ -220,23 +217,20 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                        // Nothing to query
                        continue
                }
-
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
+               acquire()
                wg.Add(1)
                go func(k string, v []string) {
+                       defer release()
+                       defer wg.Done()
                        rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
                        mtx.Lock()
+                       defer mtx.Unlock()
                        if err == nil {
                                completeResponses = append(completeResponses, rp...)
                                kind = kn
                        } else {
                                errors = append(errors, err)
                        }
-                       mtx.Unlock()
-                       wg.Done()
-                       <-sem
                }(k, v)
        }
        wg.Wait()
@@ -295,8 +289,13 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
                return
        }
 
+       var uuid string
+       if len(m[1]) > 0 {
+               // trim leading slash
+               uuid = m[1][1:]
+       }
        for _, d := range h.delegates {
-               if d(h, effectiveMethod, &clusterId, m[1], m[3], w, req) {
+               if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) {
                        return
                }
        }