From cdaca3963cf2d07e81eb71ac33e4c0966dec9b93 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 1 Oct 2018 15:48:51 -0400 Subject: [PATCH] 13619: Code cleanups Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- doc/api/methods.html.textile.liquid | 8 +- lib/controller/federation.go | 235 ++++++++++++++-------------- lib/controller/federation_test.go | 4 +- sdk/go/arvados/config.go | 18 +-- 4 files changed, 131 insertions(+), 134 deletions(-) diff --git a/doc/api/methods.html.textile.liquid b/doc/api/methods.html.textile.liquid index 6f3426523b..067018f9d1 100644 --- a/doc/api/methods.html.textile.liquid +++ b/doc/api/methods.html.textile.liquid @@ -125,13 +125,13 @@ h4. Federated listing Federated listing forwards a request to multiple clusters and combines the results. Currently only a very restricted form of the "list" method is supported. -To query multiple clusters, the list method must: +To query multiple clusters, the list request must: * Have filters only matching @[["uuid", "in", [...]]@ or @["uuid", "=", "..."]@ -* Must specify @count=none@ +* Specify @count=none@ * If @select@ is specified, it must include @uuid@ -* Must not specify @limit@, @offset@ or @order@ -* Must not request more items than the maximum response size +* Not specify @limit@, @offset@ or @order@ +* Not request more items than the maximum response size This form may be used to request a specific list of objects by uuid which are owned by multiple clusters. diff --git a/lib/controller/federation.go b/lib/controller/federation.go index 1d4844486c..51b3107eaf 100644 --- a/lib/controller/federation.go +++ b/lib/controller/federation.go @@ -73,51 +73,35 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r h.proxy.Do(w, req, urlOut, client, filter) } -// loadParamsFromForm expects a request with -// application/x-www-form-urlencoded body. It parses the query, adds -// the query parameters to "params", and replaces the request body -// with a buffer holding the original body contents so it can be -// re-read by downstream proxy steps. -func loadParamsFromForm(req *http.Request, params url.Values) error { - body, err := ioutil.ReadAll(req.Body) - if err != nil { - return err - } - req.Body = ioutil.NopCloser(bytes.NewBuffer(body)) - var v2 url.Values - if v2, err = url.ParseQuery(string(body)); err != nil { - return err - } - for k, v := range v2 { - params[k] = append(params[k], v...) - } - return nil -} - -// loadParamsFromForm expects a request with application/json body. -// It parses the body, populates "loadInto", and replaces the request -// body with a buffer holding the original body contents so it can be -// re-read by downstream proxy steps. -func loadParamsFromJson(req *http.Request, loadInto interface{}) error { - var cl int64 - if req.ContentLength > 0 { - cl = req.ContentLength +// Buffer request body, parse form parameters in request, and then +// replace original body with the buffer so it can be re-read by +// downstream proxy steps. +func loadParamsFromForm(req *http.Request) error { + var postBody *bytes.Buffer + if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" { + var cl int64 + if req.ContentLength > 0 { + cl = req.ContentLength + } + postBody = bytes.NewBuffer(make([]byte, 0, cl)) + originalBody := req.Body + defer originalBody.Close() + req.Body = ioutil.NopCloser(io.TeeReader(req.Body, postBody)) } - postBody := bytes.NewBuffer(make([]byte, 0, cl)) - defer req.Body.Close() - - rdr := io.TeeReader(req.Body, postBody) - err := json.NewDecoder(rdr).Decode(loadInto) + err := req.ParseForm() if err != nil { return err } - req.Body = ioutil.NopCloser(postBody) + + if req.Body != nil && postBody != nil { + req.Body = ioutil.NopCloser(postBody) + } return nil } type multiClusterQueryResponseCollector struct { - responses []interface{} + responses []map[string]interface{} error error kind string clusterID string @@ -131,38 +115,48 @@ func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response } defer resp.Body.Close() - loadInto := make(map[string]interface{}) + var loadInto struct { + Kind string `json:"kind"` + Items []map[string]interface{} `json:"items"` + Errors []string `json:"errors"` + } err = json.NewDecoder(resp.Body).Decode(&loadInto) - if err == nil { - if resp.StatusCode != http.StatusOK { - c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto["errors"]) - } else { - c.responses = loadInto["items"].([]interface{}) - c.kind, _ = loadInto["kind"].(string) - } - } else { + if err != nil { c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err) + return nil, nil + } + if resp.StatusCode != http.StatusOK { + c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors) + return nil, nil } + c.responses = loadInto.Items + c.kind = loadInto.Kind + return nil, nil } func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter, - req *http.Request, params url.Values, - clusterID string, uuids []string) (rp []interface{}, kind string, err error) { + req *http.Request, + clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) { found := make(map[string]bool) - for len(uuids) > 0 { + prev_len_uuids := len(uuids) + 1 + // Loop while + // (1) there are more uuids to query + // (2) we're making progress - on each iteration the set of + // uuids we are expecting for must shrink. + for len(uuids) > 0 && len(uuids) < prev_len_uuids { var remoteReq http.Request remoteReq.Header = req.Header remoteReq.Method = "POST" remoteReq.URL = &url.URL{Path: req.URL.Path} remoteParams := make(url.Values) - remoteParams["_method"] = []string{"GET"} - remoteParams["count"] = []string{"none"} - if len(params["select"]) != 0 { - remoteParams["select"] = params["select"] + remoteParams.Set("_method", "GET") + remoteParams.Set("count", "none") + if req.Form.Get("select") != "" { + remoteParams.Set("select", req.Form.Get("select")) } content, err := json.Marshal(uuids) if err != nil { @@ -200,12 +194,9 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter, // around and do another request with just the // stragglers. for _, i := range rc.responses { - m, ok := i.(map[string]interface{}) + uuid, ok := i["uuid"].(string) if ok { - uuid, ok := m["uuid"].(string) - if ok { - found[uuid] = true - } + found[uuid] = true } } @@ -215,17 +206,32 @@ func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter, l = append(l, u) } } + prev_len_uuids = len(uuids) uuids = l } return rp, kind, nil } -func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request, - params url.Values, clusterId *string) bool { +func (h *Handler) FederatedRequestConcurrency() int { + if h.Cluster.FederatedRequestConcurrency == 0 { + return 4 + } + return h.Cluster.FederatedRequestConcurrency +} + +func (h *Handler) MaxItemsPerResponse() int { + if h.Cluster.MaxItemsPerResponse == 0 { + return 1000 + } + return h.Cluster.MaxItemsPerResponse +} + +func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, + req *http.Request, clusterId *string) bool { var filters [][]interface{} - err := json.Unmarshal([]byte(params["filters"][0]), &filters) + err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters) if err != nil { httpserver.Error(w, err.Error(), http.StatusBadRequest) return true @@ -234,40 +240,40 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response // Split the list of uuids by prefix queryClusters := make(map[string][]string) expectCount := 0 - for _, f1 := range filters { - if len(f1) != 3 { + for _, filter := range filters { + if len(filter) != 3 { + return false + } + + if lhs, ok := filter[0].(string); !ok || lhs != "uuid" { + return false + } + + op, ok := filter[1].(string) + if !ok { return false } - lhs, ok := f1[0].(string) - if ok && lhs == "uuid" { - op, ok := f1[1].(string) - if !ok { - return false - } - if op == "in" { - rhs, ok := f1[2].([]interface{}) - if ok { - for _, i := range rhs { - u := i.(string) + if op == "in" { + if rhs, ok := filter[2].([]interface{}); ok { + for _, i := range rhs { + if u, ok := i.(string); ok { *clusterId = u[0:5] queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u) + expectCount += 1 } - expectCount += len(rhs) } - } else if op == "=" { - u, ok := f1[2].(string) - if ok { - *clusterId = u[0:5] - queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u) - expectCount += 1 - } - } else { - return false + } + } else if op == "=" { + if u, ok := filter[2].(string); ok { + *clusterId = u[0:5] + queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u) + expectCount += 1 } } else { return false } + } if len(queryClusters) <= 1 { @@ -277,31 +283,31 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response } // Validations - if !(len(params["count"]) == 1 && (params["count"][0] == `none` || - params["count"][0] == `"none"`)) { + count := req.Form.Get("count") + if count != "" && count != `none` && count != `"none"` { httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest) return true } - if len(params["limit"]) != 0 || len(params["offset"]) != 0 || len(params["order"]) != 0 { + if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" { httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest) return true } - if expectCount > h.handler.Cluster.MaxItemsPerResponse { + if expectCount > h.handler.MaxItemsPerResponse() { httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.", - expectCount, h.handler.Cluster.MaxItemsPerResponse), http.StatusBadRequest) + expectCount, h.handler.MaxItemsPerResponse()), http.StatusBadRequest) return true } - if len(params["select"]) == 1 { + if req.Form.Get("select") != "" { foundUUID := false - var selects []interface{} - err := json.Unmarshal([]byte(params["select"][0]), &selects) + var selects []string + err := json.Unmarshal([]byte(req.Form.Get("select")), &selects) if err != nil { httpserver.Error(w, err.Error(), http.StatusBadRequest) return true } for _, r := range selects { - if r.(string) == "uuid" { + if r == "uuid" { foundUUID = true break } @@ -312,18 +318,18 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response } } - // Perform parallel requests to each cluster + // Perform concurrent requests to each cluster - // use channel as a semaphore to limit the number of parallel + // use channel as a semaphore to limit the number of concurrent // requests at a time - sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests) + sem := make(chan bool, h.handler.FederatedRequestConcurrency()) defer close(sem) wg := sync.WaitGroup{} req.Header.Set("Content-Type", "application/x-www-form-urlencoded") mtx := sync.Mutex{} errors := []error{} - var completeResponses []interface{} + var completeResponses []map[string]interface{} var kind string for k, v := range queryClusters { @@ -337,7 +343,7 @@ func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.Response sem <- true wg.Add(1) go func(k string, v []string) { - rp, kn, err := h.remoteQueryUUIDs(w, req, params, k, v) + rp, kn, err := h.remoteQueryUUIDs(w, req, k, v) mtx.Lock() if err == nil { completeResponses = append(completeResponses, rp...) @@ -379,25 +385,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h clusterId = m[2] } - // First, parse the query portion of the URL. - var params url.Values - var err error - if params, err = url.ParseQuery(req.URL.RawQuery); err != nil { + // Get form parameters from URL and form body (if POST). + if err := loadParamsFromForm(req); err != nil { httpserver.Error(w, err.Error(), http.StatusBadRequest) return } - // Next, if appropriate, merge in parameters from the form POST body. - if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" { - if err = loadParamsFromForm(req, params); err != nil { - httpserver.Error(w, err.Error(), http.StatusBadRequest) - return - } - } - // Check if the parameters have an explicit cluster_id - if len(params["cluster_id"]) == 1 { - clusterId = params["cluster_id"][0] + if req.Form.Get("cluster_id") != "" { + clusterId = req.Form.Get("cluster_id") } // Handle the POST-as-GET special case (workaround for large @@ -405,14 +401,15 @@ func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *h // like multi-object queries where the filter has 100s of // items) effectiveMethod := req.Method - if req.Method == "POST" && len(params["_method"]) == 1 { - effectiveMethod = params["_method"][0] + if req.Method == "POST" && req.Form.Get("_method") != "" { + effectiveMethod = req.Form.Get("_method") } - if effectiveMethod == "GET" && clusterId == "" && len(params["filters"]) == 1 { - if h.handleMultiClusterQuery(w, req, params, &clusterId) { - return - } + if effectiveMethod == "GET" && + clusterId == "" && + req.Form.Get("filters") != "" && + h.handleMultiClusterQuery(w, req, &clusterId) { + return } if clusterId == "" || clusterId == h.handler.Cluster.ClusterID { @@ -667,9 +664,9 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req var errors []string var errorCode int = 404 - // use channel as a semaphore to limit the number of parallel + // use channel as a semaphore to limit the number of concurrent // requests at a time - sem := make(chan bool, h.handler.Cluster.ParallelRemoteRequests) + sem := make(chan bool, h.handler.FederatedRequestConcurrency()) defer close(sem) for remoteID := range h.handler.Cluster.RemoteClusters { // blocks until it can put a value into the diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go index 0b62ce5ff9..1b099be5e0 100644 --- a/lib/controller/federation_test.go +++ b/lib/controller/federation_test.go @@ -63,8 +63,8 @@ func (s *FederationSuite) SetUpTest(c *check.C) { NodeProfiles: map[string]arvados.NodeProfile{ "*": nodeProfile, }, - MaxItemsPerResponse: 1000, - ParallelRemoteRequests: 4, + MaxItemsPerResponse: 1000, + FederatedRequestConcurrency: 4, }, NodeProfile: &nodeProfile} s.testServer = newServerFromIntegrationTestEnv(c) s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler)) diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index f309ac7bd1..7101693f3f 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -51,15 +51,15 @@ func (sc *Config) GetCluster(clusterID string) (*Cluster, error) { } type Cluster struct { - ClusterID string `json:"-"` - ManagementToken string - NodeProfiles map[string]NodeProfile - InstanceTypes InstanceTypeMap - HTTPRequestTimeout Duration - RemoteClusters map[string]RemoteCluster - PostgreSQL PostgreSQL - MaxItemsPerResponse int - ParallelRemoteRequests int + ClusterID string `json:"-"` + ManagementToken string + NodeProfiles map[string]NodeProfile + InstanceTypes InstanceTypeMap + HTTPRequestTimeout Duration + RemoteClusters map[string]RemoteCluster + PostgreSQL PostgreSQL + MaxItemsPerResponse int + FederatedRequestConcurrency int } type PostgreSQL struct { -- 2.39.5