13619: Federated multi-object list wip
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 27 Sep 2018 14:53:22 +0000 (10:53 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 27 Sep 2018 14:53:22 +0000 (10:53 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

lib/controller/federation.go
lib/controller/federation_test.go

index cc22a674d5f6993bf7bf30d9ca6472e5b66c64a9..dc03e039faea935289ecd4f4ffe334d3bbd24b0b 100644 (file)
@@ -14,6 +14,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "net/http"
        "net/url"
        "regexp"
@@ -26,7 +27,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
-var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-)?.*$`
+var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
 var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
 var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
 var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
@@ -73,44 +74,218 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
        h.proxy.Do(w, req, urlOut, client, filter)
 }
 
+func loadParamsFromForm(req *http.Request, params url.Values) error {
+       body, err := ioutil.ReadAll(req.Body)
+       if err != nil {
+               return err
+       }
+       req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
+       var v2 url.Values
+       if v2, err = url.ParseQuery(string(body)); err != nil {
+               return err
+       }
+       for k, v := range v2 {
+               params[k] = append(params[k], v...)
+       }
+       return nil
+}
+
+func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
+       var cl int64
+       if req.ContentLength > 0 {
+               cl = req.ContentLength
+       }
+       postBody := bytes.NewBuffer(make([]byte, 0, cl))
+       defer req.Body.Close()
+
+       rdr := io.TeeReader(req.Body, postBody)
+
+       err := json.NewDecoder(rdr).Decode(loadInto)
+       if err != nil {
+               return err
+       }
+       req.Body = ioutil.NopCloser(postBody)
+       return nil
+}
+
+type responseCollector struct {
+       mtx       sync.Mutex
+       responses []interface{}
+       errors    []error
+}
+
+func (c *responseCollector) collectResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+       if requestError != nil {
+               c.mtx.Lock()
+               defer c.mtx.Unlock()
+               c.errors = append(c.errors, requestError)
+               return nil, nil
+       }
+       defer resp.Body.Close()
+       loadInto := make(map[string]interface{})
+       err = json.NewDecoder(resp.Body).Decode(&loadInto)
+
+       c.mtx.Lock()
+       defer c.mtx.Unlock()
+
+       if err == nil {
+               c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+       } else {
+               c.errors = append(c.errors, err)
+       }
+
+       return nil, nil
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
+       params url.Values, clusterId *string) bool {
+
+       var filters [][]interface{}
+       err := json.Unmarshal([]byte(params["filters"][0]), &filters)
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusBadRequest)
+               return true
+       }
+       queryClusters := make(map[string][]string)
+       if len(filters) == 1 && len(filters[0]) == 3 {
+               f1 := filters[0]
+               lhs := f1[0].(string)
+               op := f1[1].(string)
+               rhs := f1[2].([]interface{})
+               if lhs == "uuid" && op == "in" {
+                       for _, i := range rhs {
+                               u := i.(string)
+                               *clusterId = u[0:5]
+                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+                       }
+               }
+       }
+
+       if len(queryClusters) <= 1 {
+               return false
+       }
+
+       wg := sync.WaitGroup{}
+       //var errors []string
+       //var errorCode int = 404
+
+       // use channel as a semaphore to limit it to 4
+       // parallel requests at a time
+       sem := make(chan bool, 4)
+       defer close(sem)
+       req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+
+       rc := responseCollector{}
+       for k, v := range queryClusters {
+               // blocks until it can put a value into the
+               // channel (which has a max queue capacity)
+               sem <- true
+               wg.Add(1)
+               go func(k string, v []string) {
+                       defer func() {
+                               wg.Done()
+                               <-sem
+                       }()
+                       remoteReq := *req
+                       remoteReq.Method = "POST"
+                       remoteReq.URL = &url.URL{
+                               Path:    req.URL.Path,
+                               RawPath: req.URL.RawPath,
+                       }
+                       remoteParams := make(url.Values)
+                       remoteParams["_method"] = []string{"GET"}
+                       content, err := json.Marshal(v)
+                       if err != nil {
+                               rc.mtx.Lock()
+                               rc.errors = append(rc.errors, err)
+                               rc.mtx.Unlock()
+                               return
+                       }
+                       remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
+                       enc := remoteParams.Encode()
+                       remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
+
+                       if k == h.handler.Cluster.ClusterID {
+                               h.handler.proxy.Do(w, &remoteReq, remoteReq.URL,
+                                       h.handler.secureClient, rc.collectResponse)
+                       } else {
+                               h.handler.remoteClusterRequest(k, w, &remoteReq,
+                                       rc.collectResponse)
+                       }
+               }(k, v)
+       }
+       wg.Wait()
+
+       if len(rc.errors) > 0 {
+               // parallel query
+               var strerr []string
+               for _, e := range rc.errors {
+                       strerr = append(strerr, e.Error())
+               }
+               httpserver.Errors(w, strerr, http.StatusBadRequest)
+       } else {
+               log.Printf("Sending status ok %+v", rc)
+               w.Header().Set("Content-Type", "application/json")
+               w.WriteHeader(http.StatusOK)
+               itemList := make(map[string]interface{})
+               itemList["items"] = rc.responses
+               //x, _ := json.Marshal(itemList)
+               //log.Printf("Sending response %v", string(x))
+               json.NewEncoder(w).Encode(itemList)
+               log.Printf("Sent?")
+       }
+
+       return true
+}
+
 func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        m := h.matcher.FindStringSubmatch(req.URL.Path)
        clusterId := ""
 
-       if len(m) == 3 {
+       if len(m) > 0 && m[2] != "" {
                clusterId = m[2]
        }
 
-       if clusterId == "" {
-               if values, err := url.ParseQuery(req.URL.RawQuery); err == nil {
-                       if len(values["cluster_id"]) == 1 {
-                               clusterId = values["cluster_id"][0]
-                       }
+       var params url.Values
+       var err error
+       if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
+               httpserver.Error(w, err.Error(), http.StatusBadRequest)
+               return
+       }
+
+       if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+               if err = loadParamsFromForm(req, params); err != nil {
+                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
+                       return
                }
        }
 
+       if len(params["cluster_id"]) == 1 {
+               clusterId = params["cluster_id"][0]
+       }
+
        if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
                var hasClusterId struct {
                        ClusterID string `json:"cluster_id"`
                }
-               var cl int64
-               if req.ContentLength > 0 {
-                       cl = req.ContentLength
+               if err = loadParamsFromJson(req, &hasClusterId); err != nil {
+                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
+                       return
                }
-               postBody := bytes.NewBuffer(make([]byte, 0, cl))
-               defer req.Body.Close()
+               clusterId = hasClusterId.ClusterID
+       }
 
-               rdr := io.TeeReader(req.Body, postBody)
+       effectiveMethod := req.Method
+       if req.Method == "POST" && len(params["_method"]) == 1 {
+               effectiveMethod = params["_method"][0]
+       }
 
-               err := json.NewDecoder(rdr).Decode(&hasClusterId)
-               if err != nil {
-                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
+       if effectiveMethod == "GET" && clusterId == "" && len(params["filters"]) == 1 {
+               if h.handleMultiClusterQuery(w, req, params, &clusterId) {
                        return
                }
-               req.Body = ioutil.NopCloser(postBody)
-
-               clusterId = hasClusterId.ClusterID
        }
+       log.Printf("Clusterid is %q", clusterId)
 
        if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
                h.next.ServeHTTP(w, req)
@@ -331,7 +506,7 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
                m = collectionRe.FindStringSubmatch(req.URL.Path)
                clusterId := ""
 
-               if len(m) == 3 {
+               if len(m) > 0 {
                        clusterId = m[2]
                }
 
@@ -423,7 +598,7 @@ func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
        mux := http.NewServeMux()
        mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
        mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
-       mux.Handle("/arvados/v1/containers", next)
+       mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
        mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
        mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
        mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
index 3b11625f667787f1e4bca7f0a1ec1d4a1c2ed69a..b94efa6aecc8076e8cabb5c0f899360a3a67917e 100644 (file)
@@ -6,7 +6,9 @@ package controller
 
 import (
        "encoding/json"
+       "fmt"
        "io/ioutil"
+       "log"
        "net/http"
        "net/http/httptest"
        "net/url"
@@ -613,3 +615,28 @@ func (s *FederationSuite) TestGetRemoteContainer(c *check.C) {
        c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
        c.Check(cn.UUID, check.Equals, arvadostest.QueuedContainerUUID)
 }
+
+func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+       req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+               url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v"]]]`, arvadostest.QueuedContainerUUID)), nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cn arvados.ContainerList
+       c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+       c.Check(cn.Items[0].UUID, check.Equals, arvadostest.QueuedContainerUUID)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+       req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+               url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`, arvadostest.QueuedContainerUUID)), nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       log.Printf("got %+v", resp)
+       c.Assert(resp.StatusCode, check.Equals, http.StatusOK)
+       var cn arvados.ContainerList
+       c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+       c.Check(cn.Items[0].UUID, check.Equals, arvadostest.QueuedContainerUUID)
+}