Merge branch '16989-user-setup' refs #16989
[arvados.git] / lib / controller / fed_generic.go
index 0630217b6e6ac0aae1f6b26893ef4e6bd855832a..476fd97b05cd1c8a10ded9aaf43ef6b21744443c 100644 (file)
@@ -14,13 +14,23 @@ import (
        "regexp"
        "sync"
 
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
+type federatedRequestDelegate func(
+       h *genericFederatedRequestHandler,
+       effectiveMethod string,
+       clusterId *string,
+       uuid string,
+       remainder string,
+       w http.ResponseWriter,
+       req *http.Request) bool
+
 type genericFederatedRequestHandler struct {
-       next    http.Handler
-       handler *Handler
-       matcher *regexp.Regexp
+       next      http.Handler
+       handler   *Handler
+       matcher   *regexp.Regexp
+       delegates []federatedRequestDelegate
 }
 
 func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
@@ -130,7 +140,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                if op == "in" {
                        if rhs, ok := filter[2].([]interface{}); ok {
                                for _, i := range rhs {
-                                       if u, ok := i.(string); ok {
+                                       if u, ok := i.(string); ok && len(u) == 27 {
                                                *clusterId = u[0:5]
                                                queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
                                                expectCount += 1
@@ -138,7 +148,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                                }
                        }
                } else if op == "=" {
-                       if u, ok := filter[2].(string); ok {
+                       if u, ok := filter[2].(string); ok && len(u) == 27 {
                                *clusterId = u[0:5]
                                queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
                                expectCount += 1
@@ -165,9 +175,9 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
                return true
        }
-       if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+       if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max {
                httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-                       expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+                       expectCount, max), http.StatusBadRequest)
                return true
        }
        if req.Form.Get("select") != "" {
@@ -193,10 +203,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
 
        // Perform concurrent requests to each cluster
 
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
+       acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
        wg := sync.WaitGroup{}
 
        req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@@ -210,23 +217,20 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                        // Nothing to query
                        continue
                }
-
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
+               acquire()
                wg.Add(1)
                go func(k string, v []string) {
+                       defer release()
+                       defer wg.Done()
                        rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
                        mtx.Lock()
+                       defer mtx.Unlock()
                        if err == nil {
                                completeResponses = append(completeResponses, rp...)
                                kind = kn
                        } else {
                                errors = append(errors, err)
                        }
-                       mtx.Unlock()
-                       wg.Done()
-                       <-sem
                }(k, v)
        }
        wg.Wait()
@@ -285,6 +289,17 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
                return
        }
 
+       var uuid string
+       if len(m[1]) > 0 {
+               // trim leading slash
+               uuid = m[1][1:]
+       }
+       for _, d := range h.delegates {
+               if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) {
+                       return
+               }
+       }
+
        if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
                h.next.ServeHTTP(w, req)
        } else {