"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"regexp"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
-var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-)?.*$`
+var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
h.proxy.Do(w, req, urlOut, client, filter)
}
+func loadParamsFromForm(req *http.Request, params url.Values) error {
+ body, err := ioutil.ReadAll(req.Body)
+ if err != nil {
+ return err
+ }
+ req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
+ var v2 url.Values
+ if v2, err = url.ParseQuery(string(body)); err != nil {
+ return err
+ }
+ for k, v := range v2 {
+ params[k] = append(params[k], v...)
+ }
+ return nil
+}
+
+func loadParamsFromJson(req *http.Request, loadInto interface{}) error {
+ var cl int64
+ if req.ContentLength > 0 {
+ cl = req.ContentLength
+ }
+ postBody := bytes.NewBuffer(make([]byte, 0, cl))
+ defer req.Body.Close()
+
+ rdr := io.TeeReader(req.Body, postBody)
+
+ err := json.NewDecoder(rdr).Decode(loadInto)
+ if err != nil {
+ return err
+ }
+ req.Body = ioutil.NopCloser(postBody)
+ return nil
+}
+
+type responseCollector struct {
+ mtx sync.Mutex
+ responses []interface{}
+ errors []error
+}
+
+func (c *responseCollector) collectResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ c.mtx.Lock()
+ defer c.mtx.Unlock()
+ c.errors = append(c.errors, requestError)
+ return nil, nil
+ }
+ defer resp.Body.Close()
+ loadInto := make(map[string]interface{})
+ err = json.NewDecoder(resp.Body).Decode(&loadInto)
+
+ c.mtx.Lock()
+ defer c.mtx.Unlock()
+
+ if err == nil {
+ c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+ } else {
+ c.errors = append(c.errors, err)
+ }
+
+ return nil, nil
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter, req *http.Request,
+ params url.Values, clusterId *string) bool {
+
+ var filters [][]interface{}
+ err := json.Unmarshal([]byte(params["filters"][0]), &filters)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+ queryClusters := make(map[string][]string)
+ if len(filters) == 1 && len(filters[0]) == 3 {
+ f1 := filters[0]
+ lhs := f1[0].(string)
+ op := f1[1].(string)
+ rhs := f1[2].([]interface{})
+ if lhs == "uuid" && op == "in" {
+ for _, i := range rhs {
+ u := i.(string)
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ }
+ }
+ }
+
+ if len(queryClusters) <= 1 {
+ return false
+ }
+
+ wg := sync.WaitGroup{}
+ //var errors []string
+ //var errorCode int = 404
+
+ // use channel as a semaphore to limit it to 4
+ // parallel requests at a time
+ sem := make(chan bool, 4)
+ defer close(sem)
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+
+ rc := responseCollector{}
+ for k, v := range queryClusters {
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ wg.Add(1)
+ go func(k string, v []string) {
+ defer func() {
+ wg.Done()
+ <-sem
+ }()
+ remoteReq := *req
+ remoteReq.Method = "POST"
+ remoteReq.URL = &url.URL{
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ }
+ remoteParams := make(url.Values)
+ remoteParams["_method"] = []string{"GET"}
+ content, err := json.Marshal(v)
+ if err != nil {
+ rc.mtx.Lock()
+ rc.errors = append(rc.errors, err)
+ rc.mtx.Unlock()
+ return
+ }
+ remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
+ enc := remoteParams.Encode()
+ remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
+
+ if k == h.handler.Cluster.ClusterID {
+ h.handler.proxy.Do(w, &remoteReq, remoteReq.URL,
+ h.handler.secureClient, rc.collectResponse)
+ } else {
+ h.handler.remoteClusterRequest(k, w, &remoteReq,
+ rc.collectResponse)
+ }
+ }(k, v)
+ }
+ wg.Wait()
+
+ if len(rc.errors) > 0 {
+ // parallel query
+ var strerr []string
+ for _, e := range rc.errors {
+ strerr = append(strerr, e.Error())
+ }
+ httpserver.Errors(w, strerr, http.StatusBadRequest)
+ } else {
+ log.Printf("Sending status ok %+v", rc)
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ itemList := make(map[string]interface{})
+ itemList["items"] = rc.responses
+ //x, _ := json.Marshal(itemList)
+ //log.Printf("Sending response %v", string(x))
+ json.NewEncoder(w).Encode(itemList)
+ log.Printf("Sent?")
+ }
+
+ return true
+}
+
func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
m := h.matcher.FindStringSubmatch(req.URL.Path)
clusterId := ""
- if len(m) == 3 {
+ if len(m) > 0 && m[2] != "" {
clusterId = m[2]
}
- if clusterId == "" {
- if values, err := url.ParseQuery(req.URL.RawQuery); err == nil {
- if len(values["cluster_id"]) == 1 {
- clusterId = values["cluster_id"][0]
- }
+ var params url.Values
+ var err error
+ if params, err = url.ParseQuery(req.URL.RawQuery); err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ if req.Method == "POST" && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+ if err = loadParamsFromForm(req, params); err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return
}
}
+ if len(params["cluster_id"]) == 1 {
+ clusterId = params["cluster_id"][0]
+ }
+
if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
var hasClusterId struct {
ClusterID string `json:"cluster_id"`
}
- var cl int64
- if req.ContentLength > 0 {
- cl = req.ContentLength
+ if err = loadParamsFromJson(req, &hasClusterId); err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return
}
- postBody := bytes.NewBuffer(make([]byte, 0, cl))
- defer req.Body.Close()
+ clusterId = hasClusterId.ClusterID
+ }
- rdr := io.TeeReader(req.Body, postBody)
+ effectiveMethod := req.Method
+ if req.Method == "POST" && len(params["_method"]) == 1 {
+ effectiveMethod = params["_method"][0]
+ }
- err := json.NewDecoder(rdr).Decode(&hasClusterId)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ if effectiveMethod == "GET" && clusterId == "" && len(params["filters"]) == 1 {
+ if h.handleMultiClusterQuery(w, req, params, &clusterId) {
return
}
- req.Body = ioutil.NopCloser(postBody)
-
- clusterId = hasClusterId.ClusterID
}
+ log.Printf("Clusterid is %q", clusterId)
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
m = collectionRe.FindStringSubmatch(req.URL.Path)
clusterId := ""
- if len(m) == 3 {
+ if len(m) > 0 {
clusterId = m[2]
}
mux := http.NewServeMux()
mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
- mux.Handle("/arvados/v1/containers", next)
+ mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
import (
"encoding/json"
+ "fmt"
"io/ioutil"
+ "log"
"net/http"
"net/http/httptest"
"net/url"
c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
c.Check(cn.UUID, check.Equals, arvadostest.QueuedContainerUUID)
}
+
+func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
+ defer s.localServiceReturns404(c).Close()
+ req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v"]]]`, arvadostest.QueuedContainerUUID)), nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cn arvados.ContainerList
+ c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+ c.Check(cn.Items[0].UUID, check.Equals, arvadostest.QueuedContainerUUID)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
+ defer s.localServiceReturns404(c).Close()
+ req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`, arvadostest.QueuedContainerUUID)), nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ log.Printf("got %+v", resp)
+ c.Assert(resp.StatusCode, check.Equals, http.StatusOK)
+ var cn arvados.ContainerList
+ c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+ c.Check(cn.Items[0].UUID, check.Equals, arvadostest.QueuedContainerUUID)
+}