13619: Clean up, require count=none
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 27 Sep 2018 18:52:22 +0000 (14:52 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 27 Sep 2018 18:52:22 +0000 (14:52 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

lib/controller/federation.go
lib/controller/federation_test.go

index caa84ca5f29073c78b8b5322222b4c23e06bb70f..04e0e74e3569d07d5d873127ada550d9ae2f552a 100644 (file)
@@ -73,6 +73,11 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
        h.proxy.Do(w, req, urlOut, client, filter)
 }
 
+// loadParamsFromForm expects a request with
+// application/x-www-form-urlencoded body.  It parses the query, adds
+// the query parameters to "params", and replaces the request body
+// with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
 func loadParamsFromForm(req *http.Request, params url.Values) error {
        body, err := ioutil.ReadAll(req.Body)
        if err != nil {
@@ -89,6 +94,10 @@ func loadParamsFromForm(req *http.Request, params url.Values) error {
        return nil
 }
 
+// loadParamsFromForm expects a request with application/json body.
+// It parses the body, populates "loadInto", and replaces the request
+// body with a buffer holding the original body contents so it can be
+// re-read by downstream proxy steps.
 func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
        var cl int64
        if req.ContentLength > 0 {
@@ -107,13 +116,15 @@ func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
        return nil
 }
 
-type responseCollector struct {
+type multiClusterQueryResponseCollector struct {
        mtx       sync.Mutex
        responses []interface{}
        errors    []error
+       kind      string
 }
 
-func (c *responseCollector) collectResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+       requestError error) (newResponse *http.Response, err error) {
        if requestError != nil {
                c.mtx.Lock()
                defer c.mtx.Unlock()
@@ -132,6 +143,7 @@ func (c *responseCollector) collectResponse(resp *http.Response, requestError er
                        c.errors = append(c.errors, fmt.Errorf("error %v", loadInto["errors"]))
                } else {
                        c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+                       c.kind = loadInto["kind"].(string)
                }
        } else {
                c.errors = append(c.errors, err)
@@ -143,12 +155,20 @@ func (c *responseCollector) collectResponse(resp *http.Response, requestError er
 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
        params url.Values, clusterId *string) bool {
 
+       if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
+               params["count"][0] == `"none"`)) {
+               // don't federate unless params has count=none
+               return false
+       }
+
        var filters [][]interface{}
        err := json.Unmarshal([]byte(params["filters"][0]), &filters)
        if err != nil {
                httpserver.Error(w, err.Error(), http.StatusBadRequest)
                return true
        }
+
+       // Split the list of uuids by prefix
        queryClusters := make(map[string][]string)
        if len(filters) == 1 && len(filters[0]) == 3 {
                f1 := filters[0]
@@ -164,13 +184,12 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                }
        }
 
-       if len(queryClusters) <= 1 {
+       if len(queryClusters) == 0 {
+               // Didn't find any ["uuid", "in", ...] filters
                return false
        }
 
        wg := sync.WaitGroup{}
-       //var errors []string
-       //var errorCode int = 404
 
        // use channel as a semaphore to limit it to 4
        // parallel requests at a time
@@ -178,7 +197,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
        defer close(sem)
        req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
 
-       rc := responseCollector{}
+       rc := multiClusterQueryResponseCollector{}
        for k, v := range queryClusters {
                // blocks until it can put a value into the
                // channel (which has a max queue capacity)
@@ -195,6 +214,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                        remoteReq.URL = &url.URL{Path: req.URL.Path}
                        remoteParams := make(url.Values)
                        remoteParams["_method"] = []string{"GET"}
+                       remoteParams["count"] = []string{"none"}
                        content, err := json.Marshal(v)
                        if err != nil {
                                rc.mtx.Lock()
@@ -229,6 +249,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                w.WriteHeader(http.StatusOK)
                itemList := make(map[string]interface{})
                itemList["items"] = rc.responses
+               itemList["kind"] = rc.kind
                json.NewEncoder(w).Encode(itemList)
        }
 
@@ -243,6 +264,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
                clusterId = m[2]
        }
 
+       // First, parse the query portion of the URL.
        var params url.Values
        var err error
        if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
@@ -250,6 +272,7 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
                return
        }
 
+       // Next, if appropriate, merge in parameters from the form POST body.
        if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
                if err = loadParamsFromForm(req, params); err != nil {
                        httpserver.Error(w, err.Error(), http.StatusBadRequest)
@@ -257,10 +280,12 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
                }
        }
 
+       // Check if the parameters have an explicit cluster_id
        if len(params["cluster_id"]) == 1 {
                clusterId = params["cluster_id"][0]
        }
 
+       // TODO: decide if this actually makes sense...
        if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
                var hasClusterId struct {
                        ClusterID string `json:"cluster_id"`
@@ -282,7 +307,6 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
                        return
                }
        }
-       //log.Printf("Clusterid is %q", clusterId)
 
        if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
                h.next.ServeHTTP(w, req)
index 113fa9eebdbce4a4870a1ca006f70c5c72d85f09..255adfe92732ffe69763c19ab251d6eb31d1975a 100644 (file)
@@ -621,7 +621,7 @@ func (s *FederationSuite) TestGetRemoteContainer(c *check.C) {
 
 func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
        defer s.localServiceReturns404(c).Close()
-       req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+       req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
                url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v"]]]`, arvadostest.QueuedContainerUUID)), nil)
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
        resp := s.testRequest(req)
@@ -634,11 +634,11 @@ func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
 func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
        defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
                bd, _ := ioutil.ReadAll(req.Body)
-               c.Check(string(bd), check.Equals, `_method=GET&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
+               c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
                w.WriteHeader(200)
-               w.Write([]byte(`{"items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
+               w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
        })).Close()
-       req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+       req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
                url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`, arvadostest.QueuedContainerUUID)), nil)
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
        resp := s.testRequest(req)