projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
16718: Merge branch 'master' into 16718-group-contents-collection-versions
[arvados.git]
/
lib
/
controller
/
fed_generic.go
diff --git
a/lib/controller/fed_generic.go
b/lib/controller/fed_generic.go
index 0630217b6e6ac0aae1f6b26893ef4e6bd855832a..476fd97b05cd1c8a10ded9aaf43ef6b21744443c 100644
(file)
--- a/
lib/controller/fed_generic.go
+++ b/
lib/controller/fed_generic.go
@@
-14,13
+14,23
@@
import (
"regexp"
"sync"
"regexp"
"sync"
- "git.
curoverse.com
/arvados.git/sdk/go/httpserver"
+ "git.
arvados.org
/arvados.git/sdk/go/httpserver"
)
)
+type federatedRequestDelegate func(
+ h *genericFederatedRequestHandler,
+ effectiveMethod string,
+ clusterId *string,
+ uuid string,
+ remainder string,
+ w http.ResponseWriter,
+ req *http.Request) bool
+
type genericFederatedRequestHandler struct {
type genericFederatedRequestHandler struct {
- next http.Handler
- handler *Handler
- matcher *regexp.Regexp
+ next http.Handler
+ handler *Handler
+ matcher *regexp.Regexp
+ delegates []federatedRequestDelegate
}
func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
}
func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
@@
-130,7
+140,7
@@
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
if op == "in" {
if rhs, ok := filter[2].([]interface{}); ok {
for _, i := range rhs {
if op == "in" {
if rhs, ok := filter[2].([]interface{}); ok {
for _, i := range rhs {
- if u, ok := i.(string); ok {
+ if u, ok := i.(string); ok
&& len(u) == 27
{
*clusterId = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
expectCount += 1
*clusterId = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
expectCount += 1
@@
-138,7
+148,7
@@
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
}
}
} else if op == "=" {
}
}
} else if op == "=" {
- if u, ok := filter[2].(string); ok {
+ if u, ok := filter[2].(string); ok
&& len(u) == 27
{
*clusterId = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
expectCount += 1
*clusterId = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
expectCount += 1
@@
-165,9
+175,9
@@
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
return true
}
httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
return true
}
- if
expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()
{
+ if
max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max
{
httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
- expectCount,
h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()
), http.StatusBadRequest)
+ expectCount,
max
), http.StatusBadRequest)
return true
}
if req.Form.Get("select") != "" {
return true
}
if req.Form.Get("select") != "" {
@@
-193,10
+203,7
@@
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
// Perform concurrent requests to each cluster
// Perform concurrent requests to each cluster
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
- defer close(sem)
+ acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
wg := sync.WaitGroup{}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
wg := sync.WaitGroup{}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@@
-210,23
+217,20
@@
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
// Nothing to query
continue
}
// Nothing to query
continue
}
-
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
+ acquire()
wg.Add(1)
go func(k string, v []string) {
wg.Add(1)
go func(k string, v []string) {
+ defer release()
+ defer wg.Done()
rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
mtx.Lock()
rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
mtx.Lock()
+ defer mtx.Unlock()
if err == nil {
completeResponses = append(completeResponses, rp...)
kind = kn
} else {
errors = append(errors, err)
}
if err == nil {
completeResponses = append(completeResponses, rp...)
kind = kn
} else {
errors = append(errors, err)
}
- mtx.Unlock()
- wg.Done()
- <-sem
}(k, v)
}
wg.Wait()
}(k, v)
}
wg.Wait()
@@
-285,6
+289,17
@@
func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
return
}
return
}
+ var uuid string
+ if len(m[1]) > 0 {
+ // trim leading slash
+ uuid = m[1][1:]
+ }
+ for _, d := range h.delegates {
+ if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) {
+ return
+ }
+ }
+
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
} else {
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
} else {