h.proxy.Do(w, req, urlOut, client, filter)
}
-// loadParamsFromForm expects a request with
-// application/x-www-form-urlencoded body. It parses the query, adds
-// the query parameters to "params", and replaces the request body
-// with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromForm(req *http.Request, params url.Values) error {
- body, err := ioutil.ReadAll(req.Body)
- if err != nil {
- return err
- }
- req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
- var v2 url.Values
- if v2, err = url.ParseQuery(string(body)); err != nil {
- return err
- }
- for k, v := range v2 {
- params[k] = append(params[k], v...)
- }
- return nil
-}
-
-// loadParamsFromForm expects a request with application/json body.
-// It parses the body, populates "loadInto", and replaces the request
-// body with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
- var cl int64
- if req.ContentLength > 0 {
- cl = req.ContentLength
+// Buffer request body, parse form parameters in request, and then
+// replace original body with the buffer so it can be re-read by
+// downstream proxy steps.
+func loadParamsFromForm(req *http.Request) error {
+ var postBody *bytes.Buffer
+ if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+ var cl int64
+ if req.ContentLength > 0 {
+ cl = req.ContentLength
+ }
+ postBody = bytes.NewBuffer(make([]byte, 0, cl))
+ originalBody := req.Body
+ defer originalBody.Close()
+ req.Body = ioutil.NopCloser(io.TeeReader(req.Body, postBody))
}
- postBody := bytes.NewBuffer(make([]byte, 0, cl))
- defer req.Body.Close()
-
- rdr := io.TeeReader(req.Body, postBody)
- err := json.NewDecoder(rdr).Decode(loadInto)
+ err := req.ParseForm()
if err != nil {
return err
}
- req.Body = ioutil.NopCloser(postBody)
+
+ if req.Body != nil && postBody != nil {
+ req.Body = ioutil.NopCloser(postBody)
+ }
return nil
}
type multiClusterQueryResponseCollector struct {
- responses []interface{}
+ responses []map[string]interface{}
error error
kind string
clusterID string
}
defer resp.Body.Close()
- loadInto := make(map[string]interface{})
+ var loadInto struct {
+ Kind string `json:"kind"`
+ Items []map[string]interface{} `json:"items"`
+ Errors []string `json:"errors"`
+ }
err = json.NewDecoder(resp.Body).Decode(&loadInto)
- if err == nil {
- if resp.StatusCode != http.StatusOK {
- c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto["errors"])
- } else {
- c.responses = loadInto["items"].([]interface{})
- c.kind, _ = loadInto["kind"].(string)
- }
- } else {
+ if err != nil {
c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+ return nil, nil
+ }
+ if resp.StatusCode != http.StatusOK {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+ return nil, nil
}
+ c.responses = loadInto.Items
+ c.kind = loadInto.Kind
+
return nil, nil
}
func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
- req *http.Request, params url.Values,
- clusterID string, uuids []string) (rp []interface{}, kind string, err error) {
+ req *http.Request,
+ clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
found := make(map[string]bool)
- for len(uuids) > 0 {
+ prev_len_uuids := len(uuids) + 1
+ // Loop while
+ // (1) there are more uuids to query
+ // (2) we're making progress - on each iteration the set of
+ // uuids we are expecting for must shrink.
+ for len(uuids) > 0 && len(uuids) < prev_len_uuids {
var remoteReq http.Request
remoteReq.Header = req.Header
remoteReq.Method = "POST"
remoteReq.URL = &url.URL{Path: req.URL.Path}
remoteParams := make(url.Values)
- remoteParams["_method"] = []string{"GET"}
- remoteParams["count"] = []string{"none"}
- if len(params["select"]) != 0 {
- remoteParams["select"] = params["select"]
+ remoteParams.Set("_method", "GET")
+ remoteParams.Set("count", "none")
+ if req.Form.Get("select") != "" {
+ remoteParams.Set("select", req.Form.Get("select"))
}
content, err := json.Marshal(uuids)
if err != nil {
// around and do another request with just the
// stragglers.
for _, i := range rc.responses {
- m, ok := i.(map[string]interface{})
+ uuid, ok := i["uuid"].(string)
if ok {
- uuid, ok := m["uuid"].(string)
- if ok {
- found[uuid] = true
- }
+ found[uuid] = true
}
}
l = append(l, u)
}
}
+ prev_len_uuids = len(uuids)
uuids = l
}
return rp, kind, nil
}
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
- params url.Values, clusterId *string) bool {
+func (h *Handler) FederatedRequestConcurrency() int {
+ if h.Cluster.FederatedRequestConcurrency == 0 {
+ return 4
+ }
+ return h.Cluster.FederatedRequestConcurrency
+}
+
+func (h *Handler) MaxItemsPerResponse() int {
+ if h.Cluster.MaxItemsPerResponse == 0 {
+ return 1000
+ }
+ return h.Cluster.MaxItemsPerResponse
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+ req *http.Request, clusterId *string) bool {
var filters [][]interface{}
- err := json.Unmarshal([]byte(params["filters"][0]), &filters)
+ err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return true
// Split the list of uuids by prefix
queryClusters := make(map[string][]string)
expectCount := 0
- for _, f1 := range filters {
- if len(f1) != 3 {
+ for _, filter := range filters {
+ if len(filter) != 3 {
+ return false
+ }
+
+ if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+ return false
+ }
+
+ op, ok := filter[1].(string)
+ if !ok {
return false
}
- lhs, ok := f1[0].(string)
- if ok && lhs == "uuid" {
- op, ok := f1[1].(string)
- if !ok {
- return false
- }
- if op == "in" {
- rhs, ok := f1[2].([]interface{})
- if ok {
- for _, i := range rhs {
- u := i.(string)
+ if op == "in" {
+ if rhs, ok := filter[2].([]interface{}); ok {
+ for _, i := range rhs {
+ if u, ok := i.(string); ok {
*clusterId = u[0:5]
queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
}
- expectCount += len(rhs)
}
- } else if op == "=" {
- u, ok := f1[2].(string)
- if ok {
- *clusterId = u[0:5]
- queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
- expectCount += 1
- }
- } else {
- return false
+ }
+ } else if op == "=" {
+ if u, ok := filter[2].(string); ok {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
}
} else {
return false
}
+
}
if len(queryClusters) <= 1 {
}
// Validations
- if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
- params["count"][0] == `"none"`)) {
+ count := req.Form.Get("count")
+ if count != "" && count != `none` && count != `"none"` {
httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
return true
}
- if len(params["limit"]) != 0 || len(params["offset"]) != 0 || len(params["order"]) != 0 {
+ if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
return true
}
- if expectCount > h.handler.Cluster.MaxItemsPerResponse {
+ if expectCount > h.handler.MaxItemsPerResponse() {
httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
- expectCount, h.handler.Cluster.MaxItemsPerResponse), http.StatusBadRequest)
+ expectCount, h.handler.MaxItemsPerResponse()), http.StatusBadRequest)
return true
}
- if len(params["select"]) == 1 {
+ if req.Form.Get("select") != "" {
foundUUID := false
- var selects []interface{}
- err := json.Unmarshal([]byte(params["select"][0]), &selects)
+ var selects []string
+ err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return true
}
for _, r := range selects {
- if r.(string) == "uuid" {
+ if r == "uuid" {
foundUUID = true
break
}
}
}
- // Perform parallel requests to each cluster
+ // Perform concurrent requests to each cluster
- // use channel as a semaphore to limit the number of parallel
+ // use channel as a semaphore to limit the number of concurrent
// requests at a time
- sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+ sem := make(chan bool, h.handler.FederatedRequestConcurrency())
defer close(sem)
wg := sync.WaitGroup{}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
mtx := sync.Mutex{}
errors := []error{}
- var completeResponses []interface{}
+ var completeResponses []map[string]interface{}
var kind string
for k, v := range queryClusters {
sem <- true
wg.Add(1)
go func(k string, v []string) {
- rp, kn, err := h.remoteQueryUUIDs(w, req, params, k, v)
+ rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
mtx.Lock()
if err == nil {
completeResponses = append(completeResponses, rp...)
clusterId = m[2]
}
- // First, parse the query portion of the URL.
- var params url.Values
- var err error
- if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
+ // Get form parameters from URL and form body (if POST).
+ if err := loadParamsFromForm(req); err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return
}
- // Next, if appropriate, merge in parameters from the form POST body.
- if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
- if err = loadParamsFromForm(req, params); err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- }
-
// Check if the parameters have an explicit cluster_id
- if len(params["cluster_id"]) == 1 {
- clusterId = params["cluster_id"][0]
+ if req.Form.Get("cluster_id") != "" {
+ clusterId = req.Form.Get("cluster_id")
}
// Handle the POST-as-GET special case (workaround for large
// like multi-object queries where the filter has 100s of
// items)
effectiveMethod := req.Method
- if req.Method == "POST" && len(params["_method"]) == 1 {
- effectiveMethod = params["_method"][0]
+ if req.Method == "POST" && req.Form.Get("_method") != "" {
+ effectiveMethod = req.Form.Get("_method")
}
- if effectiveMethod == "GET" && clusterId == "" && len(params["filters"]) == 1 {
- if h.handleMultiClusterQuery(w, req, params, &clusterId) {
- return
- }
+ if effectiveMethod == "GET" &&
+ clusterId == "" &&
+ req.Form.Get("filters") != "" &&
+ h.handleMultiClusterQuery(w, req, &clusterId) {
+ return
}
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
var errors []string
var errorCode int = 404
- // use channel as a semaphore to limit the number of parallel
+ // use channel as a semaphore to limit the number of concurrent
// requests at a time
- sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+ sem := make(chan bool, h.handler.FederatedRequestConcurrency())
defer close(sem)
for remoteID := range h.handler.Cluster.RemoteClusters {
// blocks until it can put a value into the