h.proxy.Do(w, req, urlOut, client, filter)
}
+// loadParamsFromForm expects a request with
+// application/x-www-form-urlencoded body. It parses the query, adds
+// the query parameters to "params", and replaces the request body
+// with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
func loadParamsFromForm(req *http.Request, params url.Values) error {
body, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil
}
+// loadParamsFromForm expects a request with application/json body.
+// It parses the body, populates "loadInto", and replaces the request
+// body with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
var cl int64
if req.ContentLength > 0 {
return nil
}
-type responseCollector struct {
+type multiClusterQueryResponseCollector struct {
mtx sync.Mutex
responses []interface{}
errors []error
+ kind string
}
-func (c *responseCollector) collectResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+ requestError error) (newResponse *http.Response, err error) {
if requestError != nil {
c.mtx.Lock()
defer c.mtx.Unlock()
c.errors = append(c.errors, fmt.Errorf("error %v", loadInto["errors"]))
} else {
c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+ c.kind = loadInto["kind"].(string)
}
} else {
c.errors = append(c.errors, err)
func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
params url.Values, clusterId *string) bool {
+ if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
+ params["count"][0] == `"none"`)) {
+ // don't federate unless params has count=none
+ return false
+ }
+
var filters [][]interface{}
err := json.Unmarshal([]byte(params["filters"][0]), &filters)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
return true
}
+
+ // Split the list of uuids by prefix
queryClusters := make(map[string][]string)
if len(filters) == 1 && len(filters[0]) == 3 {
f1 := filters[0]
}
}
- if len(queryClusters) <= 1 {
+ if len(queryClusters) == 0 {
+ // Didn't find any ["uuid", "in", ...] filters
return false
}
wg := sync.WaitGroup{}
- //var errors []string
- //var errorCode int = 404
// use channel as a semaphore to limit it to 4
// parallel requests at a time
defer close(sem)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
- rc := responseCollector{}
+ rc := multiClusterQueryResponseCollector{}
for k, v := range queryClusters {
// blocks until it can put a value into the
// channel (which has a max queue capacity)
remoteReq.URL = &url.URL{Path: req.URL.Path}
remoteParams := make(url.Values)
remoteParams["_method"] = []string{"GET"}
+ remoteParams["count"] = []string{"none"}
content, err := json.Marshal(v)
if err != nil {
rc.mtx.Lock()
w.WriteHeader(http.StatusOK)
itemList := make(map[string]interface{})
itemList["items"] = rc.responses
+ itemList["kind"] = rc.kind
json.NewEncoder(w).Encode(itemList)
}
clusterId = m[2]
}
+ // First, parse the query portion of the URL.
var params url.Values
var err error
if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
return
}
+ // Next, if appropriate, merge in parameters from the form POST body.
if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
if err = loadParamsFromForm(req, params); err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
}
}
+ // Check if the parameters have an explicit cluster_id
if len(params["cluster_id"]) == 1 {
clusterId = params["cluster_id"][0]
}
+ // TODO: decide if this actually makes sense...
if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
var hasClusterId struct {
ClusterID string `json:"cluster_id"`
return
}
}
- //log.Printf("Clusterid is %q", clusterId)
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
defer s.localServiceReturns404(c).Close()
- req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+ req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v"]]]`, arvadostest.QueuedContainerUUID)), nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
resp := s.testRequest(req)
func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
bd, _ := ioutil.ReadAll(req.Body)
- c.Check(string(bd), check.Equals, `_method=GET&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
+ c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
w.WriteHeader(200)
- w.Write([]byte(`{"items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
+ w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
})).Close()
- req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+ req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`, arvadostest.QueuedContainerUUID)), nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
resp := s.testRequest(req)