"regexp"
"sync"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
)
type federatedRequestDelegate func(
h *genericFederatedRequestHandler,
effectiveMethod string,
- clusterId *string,
+ clusterID *string,
uuid string,
remainder string,
w http.ResponseWriter,
clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
found := make(map[string]bool)
- prev_len_uuids := len(uuids) + 1
+ prevLenUuids := len(uuids) + 1
// Loop while
// (1) there are more uuids to query
// (2) we're making progress - on each iteration the set of
// uuids we are expecting for must shrink.
- for len(uuids) > 0 && len(uuids) < prev_len_uuids {
+ for len(uuids) > 0 && len(uuids) < prevLenUuids {
var remoteReq http.Request
remoteReq.Header = req.Header
remoteReq.Method = "POST"
l = append(l, u)
}
}
- prev_len_uuids = len(uuids)
+ prevLenUuids = len(uuids)
uuids = l
}
}
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
- req *http.Request, clusterId *string) bool {
+ req *http.Request, clusterID *string) bool {
var filters [][]interface{}
err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
if op == "in" {
if rhs, ok := filter[2].([]interface{}); ok {
for _, i := range rhs {
- if u, ok := i.(string); ok {
- *clusterId = u[0:5]
+ if u, ok := i.(string); ok && len(u) == 27 {
+ *clusterID = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
+ expectCount++
}
}
}
} else if op == "=" {
- if u, ok := filter[2].(string); ok {
- *clusterId = u[0:5]
+ if u, ok := filter[2].(string); ok && len(u) == 27 {
+ *clusterID = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
+ expectCount++
}
} else {
return false
httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
return true
}
- if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+ if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max {
httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
- expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+ expectCount, max), http.StatusBadRequest)
return true
}
if req.Form.Get("select") != "" {
// Perform concurrent requests to each cluster
- // use channel as a semaphore to limit the number of concurrent
- // requests at a time
- sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
- defer close(sem)
+ acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
wg := sync.WaitGroup{}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// Nothing to query
continue
}
-
- // blocks until it can put a value into the
- // channel (which has a max queue capacity)
- sem <- true
+ acquire()
wg.Add(1)
go func(k string, v []string) {
+ defer release()
+ defer wg.Done()
rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
mtx.Lock()
+ defer mtx.Unlock()
if err == nil {
completeResponses = append(completeResponses, rp...)
kind = kn
} else {
errors = append(errors, err)
}
- mtx.Unlock()
- wg.Done()
- <-sem
}(k, v)
}
wg.Wait()
func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
m := h.matcher.FindStringSubmatch(req.URL.Path)
- clusterId := ""
+ clusterID := ""
if len(m) > 0 && m[2] != "" {
- clusterId = m[2]
+ clusterID = m[2]
}
// Get form parameters from URL and form body (if POST).
// Check if the parameters have an explicit cluster_id
if req.Form.Get("cluster_id") != "" {
- clusterId = req.Form.Get("cluster_id")
+ clusterID = req.Form.Get("cluster_id")
}
// Handle the POST-as-GET special case (workaround for large
}
if effectiveMethod == "GET" &&
- clusterId == "" &&
+ clusterID == "" &&
req.Form.Get("filters") != "" &&
- h.handleMultiClusterQuery(w, req, &clusterId) {
+ h.handleMultiClusterQuery(w, req, &clusterID) {
return
}
+ var uuid string
+ if len(m[1]) > 0 {
+ // trim leading slash
+ uuid = m[1][1:]
+ }
for _, d := range h.delegates {
- if d(h, effectiveMethod, &clusterId, m[1], m[3], w, req) {
+ if d(h, effectiveMethod, &clusterID, uuid, m[3], w, req) {
return
}
}
- if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
+ if clusterID == "" || clusterID == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
} else {
- resp, err := h.handler.remoteClusterRequest(clusterId, req)
+ resp, err := h.handler.remoteClusterRequest(clusterID, req)
h.handler.proxy.ForwardResponse(w, resp, err)
}
}