13619: Code cleanups
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 1 Oct 2018 19:48:51 +0000 (15:48 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 1 Oct 2018 19:48:51 +0000 (15:48 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

doc/api/methods.html.textile.liquid
lib/controller/federation.go
lib/controller/federation_test.go
sdk/go/arvados/config.go

index 6f3426523b5a780347979a5923721d38198d5cc6..067018f9d132ecfa774839285730da34edccd4c2 100644 (file)
@@ -125,13 +125,13 @@ h4. Federated listing
 
 Federated listing forwards a request to multiple clusters and combines the results.  Currently only a very restricted form of the "list" method is supported.
 
-To query multiple clusters, the list method must:
+To query multiple clusters, the list request must:
 
 * Have filters only matching @[["uuid", "in", [...]]@ or @["uuid", "=", "..."]@
-* Must specify @count=none@
+* Specify @count=none@
 * If @select@ is specified, it must include @uuid@
-* Must not specify @limit@, @offset@ or @order@
-* Must not request more items than the maximum response size
+* Not specify @limit@, @offset@ or @order@
+* Not request more items than the maximum response size
 
 This form may be used to request a specific list of objects by uuid which are owned by multiple clusters.
 
index 1d4844486ca3b24f78d92b7ffe15e478250fca75..51b3107eaf69f783f55739fd653ab5161998311e 100644 (file)
@@ -73,51 +73,35 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
        h.proxy.Do(w, req, urlOut, client, filter)
 }
 
-// loadParamsFromForm expects a request with
-// application/x-www-form-urlencoded body.  It parses the query, adds
-// the query parameters to "params", and replaces the request body
-// with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromForm(req *http.Request, params url.Values) error {
-       body, err := ioutil.ReadAll(req.Body)
-       if err != nil {
-               return err
-       }
-       req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
-       var v2 url.Values
-       if v2, err = url.ParseQuery(string(body)); err != nil {
-               return err
-       }
-       for k, v := range v2 {
-               params[k] = append(params[k], v...)
-       }
-       return nil
-}
-
-// loadParamsFromForm expects a request with application/json body.
-// It parses the body, populates "loadInto", and replaces the request
-// body with a buffer holding the original body contents so it can be
-// re-read by downstream proxy steps.
-func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
-       var cl int64
-       if req.ContentLength > 0 {
-               cl = req.ContentLength
+// Buffer request body, parse form parameters in request, and then
+// replace original body with the buffer so it can be re-read by
+// downstream proxy steps.
+func loadParamsFromForm(req *http.Request) error {
+       var postBody *bytes.Buffer
+       if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+               var cl int64
+               if req.ContentLength > 0 {
+                       cl = req.ContentLength
+               }
+               postBody = bytes.NewBuffer(make([]byte, 0, cl))
+               originalBody := req.Body
+               defer originalBody.Close()
+               req.Body = ioutil.NopCloser(io.TeeReader(req.Body, postBody))
        }
-       postBody := bytes.NewBuffer(make([]byte, 0, cl))
-       defer req.Body.Close()
-
-       rdr := io.TeeReader(req.Body, postBody)
 
-       err := json.NewDecoder(rdr).Decode(loadInto)
+       err := req.ParseForm()
        if err != nil {
                return err
        }
-       req.Body = ioutil.NopCloser(postBody)
+
+       if req.Body != nil && postBody != nil {
+               req.Body = ioutil.NopCloser(postBody)
+       }
        return nil
 }
 
 type multiClusterQueryResponseCollector struct {
-       responses []interface{}
+       responses []map[string]interface{}
        error     error
        kind      string
        clusterID string
@@ -131,38 +115,48 @@ func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response
        }
 
        defer resp.Body.Close()
-       loadInto := make(map[string]interface{})
+       var loadInto struct {
+               Kind   string                   `json:"kind"`
+               Items  []map[string]interface{} `json:"items"`
+               Errors []string                 `json:"errors"`
+       }
        err = json.NewDecoder(resp.Body).Decode(&loadInto)
 
-       if err == nil {
-               if resp.StatusCode != http.StatusOK {
-                       c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto["errors"])
-               } else {
-                       c.responses = loadInto["items"].([]interface{})
-                       c.kind, _ = loadInto["kind"].(string)
-               }
-       } else {
+       if err != nil {
                c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+               return nil, nil
+       }
+       if resp.StatusCode != http.StatusOK {
+               c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+               return nil, nil
        }
 
+       c.responses = loadInto.Items
+       c.kind = loadInto.Kind
+
        return nil, nil
 }
 
 func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
-       req *http.Request, params url.Values,
-       clusterID string, uuids []string) (rp []interface{}, kind string, err error) {
+       req *http.Request,
+       clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
 
        found := make(map[string]bool)
-       for len(uuids) > 0 {
+       prev_len_uuids := len(uuids) + 1
+       // Loop while
+       // (1) there are more uuids to query
+       // (2) we're making progress - on each iteration the set of
+       // uuids we are expecting for must shrink.
+       for len(uuids) > 0 && len(uuids) < prev_len_uuids {
                var remoteReq http.Request
                remoteReq.Header = req.Header
                remoteReq.Method = "POST"
                remoteReq.URL = &url.URL{Path: req.URL.Path}
                remoteParams := make(url.Values)
-               remoteParams["_method"] = []string{"GET"}
-               remoteParams["count"] = []string{"none"}
-               if len(params["select"]) != 0 {
-                       remoteParams["select"] = params["select"]
+               remoteParams.Set("_method", "GET")
+               remoteParams.Set("count", "none")
+               if req.Form.Get("select") != "" {
+                       remoteParams.Set("select", req.Form.Get("select"))
                }
                content, err := json.Marshal(uuids)
                if err != nil {
@@ -200,12 +194,9 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
                // around and do another request with just the
                // stragglers.
                for _, i := range rc.responses {
-                       m, ok := i.(map[string]interface{})
+                       uuid, ok := i["uuid"].(string)
                        if ok {
-                               uuid, ok := m["uuid"].(string)
-                               if ok {
-                                       found[uuid] = true
-                               }
+                               found[uuid] = true
                        }
                }
 
@@ -215,17 +206,32 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
                                l = append(l, u)
                        }
                }
+               prev_len_uuids = len(uuids)
                uuids = l
        }
 
        return rp, kind, nil
 }
 
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
-       params url.Values, clusterId *string) bool {
+func (h *Handler) FederatedRequestConcurrency() int {
+       if h.Cluster.FederatedRequestConcurrency == 0 {
+               return 4
+       }
+       return h.Cluster.FederatedRequestConcurrency
+}
+
+func (h *Handler) MaxItemsPerResponse() int {
+       if h.Cluster.MaxItemsPerResponse == 0 {
+               return 1000
+       }
+       return h.Cluster.MaxItemsPerResponse
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+       req *http.Request, clusterId *string) bool {
 
        var filters [][]interface{}
-       err := json.Unmarshal([]byte(params["filters"][0]), &filters)
+       err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
        if err != nil {
                httpserver.Error(w, err.Error(), http.StatusBadRequest)
                return true
@@ -234,40 +240,40 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
        // Split the list of uuids by prefix
        queryClusters := make(map[string][]string)
        expectCount := 0
-       for _, f1 := range filters {
-               if len(f1) != 3 {
+       for _, filter := range filters {
+               if len(filter) != 3 {
+                       return false
+               }
+
+               if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+                       return false
+               }
+
+               op, ok := filter[1].(string)
+               if !ok {
                        return false
                }
-               lhs, ok := f1[0].(string)
-               if ok && lhs == "uuid" {
-                       op, ok := f1[1].(string)
-                       if !ok {
-                               return false
-                       }
 
-                       if op == "in" {
-                               rhs, ok := f1[2].([]interface{})
-                               if ok {
-                                       for _, i := range rhs {
-                                               u := i.(string)
+               if op == "in" {
+                       if rhs, ok := filter[2].([]interface{}); ok {
+                               for _, i := range rhs {
+                                       if u, ok := i.(string); ok {
                                                *clusterId = u[0:5]
                                                queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+                                               expectCount += 1
                                        }
-                                       expectCount += len(rhs)
                                }
-                       } else if op == "=" {
-                               u, ok := f1[2].(string)
-                               if ok {
-                                       *clusterId = u[0:5]
-                                       queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-                                       expectCount += 1
-                               }
-                       } else {
-                               return false
+                       }
+               } else if op == "=" {
+                       if u, ok := filter[2].(string); ok {
+                               *clusterId = u[0:5]
+                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+                               expectCount += 1
                        }
                } else {
                        return false
                }
+
        }
 
        if len(queryClusters) <= 1 {
@@ -277,31 +283,31 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
        }
 
        // Validations
-       if !(len(params["count"]) == 1 && (params["count"][0] == `none` ||
-               params["count"][0] == `"none"`)) {
+       count := req.Form.Get("count")
+       if count != "" && count != `none` && count != `"none"` {
                httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
                return true
        }
-       if len(params["limit"]) != 0 || len(params["offset"]) != 0 || len(params["order"]) != 0 {
+       if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
                httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
                return true
        }
-       if expectCount > h.handler.Cluster.MaxItemsPerResponse {
+       if expectCount > h.handler.MaxItemsPerResponse() {
                httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-                       expectCount, h.handler.Cluster.MaxItemsPerResponse), http.StatusBadRequest)
+                       expectCount, h.handler.MaxItemsPerResponse()), http.StatusBadRequest)
                return true
        }
-       if len(params["select"]) == 1 {
+       if req.Form.Get("select") != "" {
                foundUUID := false
-               var selects []interface{}
-               err := json.Unmarshal([]byte(params["select"][0]), &selects)
+               var selects []string
+               err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
                if err != nil {
                        httpserver.Error(w, err.Error(), http.StatusBadRequest)
                        return true
                }
 
                for _, r := range selects {
-                       if r.(string) == "uuid" {
+                       if r == "uuid" {
                                foundUUID = true
                                break
                        }
@@ -312,18 +318,18 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                }
        }
 
-       // Perform parallel requests to each cluster
+       // Perform concurrent requests to each cluster
 
-       // use channel as a semaphore to limit the number of parallel
+       // use channel as a semaphore to limit the number of concurrent
        // requests at a time
-       sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+       sem := make(chan bool, h.handler.FederatedRequestConcurrency())
        defer close(sem)
        wg := sync.WaitGroup{}
 
        req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
        mtx := sync.Mutex{}
        errors := []error{}
-       var completeResponses []interface{}
+       var completeResponses []map[string]interface{}
        var kind string
 
        for k, v := range queryClusters {
@@ -337,7 +343,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response
                sem <- true
                wg.Add(1)
                go func(k string, v []string) {
-                       rp, kn, err := h.remoteQueryUUIDs(w, req, params, k, v)
+                       rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
                        mtx.Lock()
                        if err == nil {
                                completeResponses = append(completeResponses, rp...)
@@ -379,25 +385,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
                clusterId = m[2]
        }
 
-       // First, parse the query portion of the URL.
-       var params url.Values
-       var err error
-       if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
+       // Get form parameters from URL and form body (if POST).
+       if err := loadParamsFromForm(req); err != nil {
                httpserver.Error(w, err.Error(), http.StatusBadRequest)
                return
        }
 
-       // Next, if appropriate, merge in parameters from the form POST body.
-       if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
-               if err = loadParamsFromForm(req, params); err != nil {
-                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
-                       return
-               }
-       }
-
        // Check if the parameters have an explicit cluster_id
-       if len(params["cluster_id"]) == 1 {
-               clusterId = params["cluster_id"][0]
+       if req.Form.Get("cluster_id") != "" {
+               clusterId = req.Form.Get("cluster_id")
        }
 
        // Handle the POST-as-GET special case (workaround for large
@@ -405,14 +401,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h
        // like multi-object queries where the filter has 100s of
        // items)
        effectiveMethod := req.Method
-       if req.Method == "POST" && len(params["_method"]) == 1 {
-               effectiveMethod = params["_method"][0]
+       if req.Method == "POST" && req.Form.Get("_method") != "" {
+               effectiveMethod = req.Form.Get("_method")
        }
 
-       if effectiveMethod == "GET" && clusterId == "" && len(params["filters"]) == 1 {
-               if h.handleMultiClusterQuery(w, req, params, &clusterId) {
-                       return
-               }
+       if effectiveMethod == "GET" &&
+               clusterId == "" &&
+               req.Form.Get("filters") != "" &&
+               h.handleMultiClusterQuery(w, req, &clusterId) {
+               return
        }
 
        if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
@@ -667,9 +664,9 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
        var errors []string
        var errorCode int = 404
 
-       // use channel as a semaphore to limit the number of parallel
+       // use channel as a semaphore to limit the number of concurrent
        // requests at a time
-       sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests)
+       sem := make(chan bool, h.handler.FederatedRequestConcurrency())
        defer close(sem)
        for remoteID := range h.handler.Cluster.RemoteClusters {
                // blocks until it can put a value into the
index 0b62ce5ff9e8881b40ea0d0b7f6239e2a6bc8ace..1b099be5e002f55e2f1be9ce18fe6a5e9535d1fa 100644 (file)
@@ -63,8 +63,8 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
                NodeProfiles: map[string]arvados.NodeProfile{
                        "*": nodeProfile,
                },
-               MaxItemsPerResponse:    1000,
-               ParallelRemoteRequests: 4,
+               MaxItemsPerResponse:         1000,
+               FederatedRequestConcurrency: 4,
        }, NodeProfile: &nodeProfile}
        s.testServer = newServerFromIntegrationTestEnv(c)
        s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler))
index f309ac7bd130306cc6a2acb721374eb59612e2f9..7101693f3fbd9cbd2d2e8df4ac3f45cca0f0907b 100644 (file)
@@ -51,15 +51,15 @@ func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
 }
 
 type Cluster struct {
-       ClusterID              string `json:"-"`
-       ManagementToken        string
-       NodeProfiles           map[string]NodeProfile
-       InstanceTypes          InstanceTypeMap
-       HTTPRequestTimeout     Duration
-       RemoteClusters         map[string]RemoteCluster
-       PostgreSQL             PostgreSQL
-       MaxItemsPerResponse    int
-       ParallelRemoteRequests int
+       ClusterID                   string `json:"-"`
+       ManagementToken             string
+       NodeProfiles                map[string]NodeProfile
+       InstanceTypes               InstanceTypeMap
+       HTTPRequestTimeout          Duration
+       RemoteClusters              map[string]RemoteCluster
+       PostgreSQL                  PostgreSQL
+       MaxItemsPerResponse         int
+       FederatedRequestConcurrency int
 }
 
 type PostgreSQL struct {