14197: Supports routing object create requests, with tests
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 21 Sep 2018 18:25:34 +0000 (14:25 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 21 Sep 2018 18:25:34 +0000 (14:25 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

lib/controller/federation.go
lib/controller/federation_test.go
lib/controller/proxy.go
sdk/go/arvados/container.go
sdk/go/arvadostest/fixtures.go

index 7c1dbfb2c2f440002a6f197475173e922e6b19ee..24c692aef57eafbd06a83bfaeb4bb099728c39ad 100644 (file)
@@ -26,10 +26,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
-var wfRe = regexp.MustCompile(`^/arvados/v1/workflows/([0-9a-z]{5})-[^/]+$`)
-var containersRe = regexp.MustCompile(`^/arvados/v1/containers/([0-9a-z]{5})-[^/]+$`)
-var containerRequestsRe = regexp.MustCompile(`^/arvados/v1/container_requests/([0-9a-z]{5})-[^/]+$`)
-var collectionRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-z]{5})-[^/]+$`)
+var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-)?.*$`
+var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
+var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
+var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
+var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
 var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
 
 type genericFederatedRequestHandler struct {
@@ -74,11 +75,47 @@ func (h *Handler) remoteClusterRequest(remoteID string, w http.ResponseWriter, r
 
 func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        m := h.matcher.FindStringSubmatch(req.URL.Path)
-       if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
+       clusterId := ""
+
+       if len(m) == 3 {
+               clusterId = m[2]
+       }
+
+       if clusterId == "" {
+               if values, err := url.ParseQuery(req.URL.RawQuery); err == nil {
+                       if len(values["cluster_id"]) == 1 {
+                               clusterId = values["cluster_id"][0]
+                       }
+               }
+       }
+
+       if clusterId == "" && req.Method == "POST" {
+               var hasClusterId struct {
+                       ClusterID string `json:"cluster_id"`
+               }
+               var cl int64
+               if req.ContentLength > 0 {
+                       cl = req.ContentLength
+               }
+               postBody := bytes.NewBuffer(make([]byte, 0, cl))
+               defer req.Body.Close()
+
+               rdr := io.TeeReader(req.Body, postBody)
+
+               err := json.NewDecoder(rdr).Decode(&hasClusterId)
+               if err != nil {
+                       // TODO
+               }
+               req.Body = ioutil.NopCloser(postBody)
+
+               clusterId = hasClusterId.ClusterID
+       }
+
+       if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
                h.next.ServeHTTP(w, req)
-               return
+       } else {
+               h.handler.remoteClusterRequest(clusterId, w, req, nil)
        }
-       h.handler.remoteClusterRequest(m[1], w, req, nil)
 }
 
 type rewriteSignaturesClusterId struct {
@@ -291,10 +328,16 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
        if len(m) != 2 {
                // Not a collection PDH GET request
                m = collectionRe.FindStringSubmatch(req.URL.Path)
-               if len(m) == 2 && m[1] != h.handler.Cluster.ClusterID {
+               clusterId := ""
+
+               if len(m) == 3 {
+                       clusterId = m[2]
+               }
+
+               if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
                        // request for remote collection by uuid
-                       h.handler.remoteClusterRequest(m[1], w, req,
-                               rewriteSignaturesClusterId{m[1], ""}.rewriteSignatures)
+                       h.handler.remoteClusterRequest(clusterId, w, req,
+                               rewriteSignaturesClusterId{clusterId, ""}.rewriteSignatures)
                        return
                }
                // not a collection UUID request, or it is a request
@@ -377,12 +420,11 @@ func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req
 
 func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
        mux := http.NewServeMux()
-
-       mux.Handle("/arvados/v1/workflows", next)
+       mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
        mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
        mux.Handle("/arvados/v1/containers", next)
        mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
-       mux.Handle("/arvados/v1/container_requests", next)
+       mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
        mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
        mux.Handle("/arvados/v1/collections", next)
        mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
index 52906ead326fd4dad2cd9eda3bf1f7e80a2e7c95..7e7cbf648938dfcb069d874df45aafe6a2f17db4 100644 (file)
@@ -302,6 +302,27 @@ func (s *FederationSuite) checkJSONErrorMatches(c *check.C, resp *http.Response,
        c.Check(jresp.Errors[0], check.Matches, re)
 }
 
+func (s *FederationSuite) localServiceReturns404(c *check.C) *httpserver.Server {
+       srv := &httpserver.Server{
+               Server: http.Server{
+                       Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+                               w.WriteHeader(404)
+                       }),
+               },
+       }
+
+       c.Assert(srv.Start(), check.IsNil)
+
+       np := arvados.NodeProfile{
+               Controller: arvados.SystemServiceInstance{Listen: ":"},
+               RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
+                       TLS: false, Insecure: true}}
+       s.testHandler.Cluster.NodeProfiles["*"] = np
+       s.testHandler.NodeProfile = &np
+
+       return srv
+}
+
 func (s *FederationSuite) TestGetLocalCollection(c *check.C) {
        np := arvados.NodeProfile{
                Controller: arvados.SystemServiceInstance{Listen: ":"},
@@ -325,6 +346,8 @@ func (s *FederationSuite) TestGetLocalCollection(c *check.C) {
 }
 
 func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+
        req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementCollection, nil)
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
        resp := s.testRequest(req)
@@ -338,6 +361,8 @@ func (s *FederationSuite) TestGetRemoteCollection(c *check.C) {
 }
 
 func (s *FederationSuite) TestGetRemoteCollectionError(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+
        req := httptest.NewRequest("GET", "/arvados/v1/collections/zzzzz-4zz18-fakefakefakefak", nil)
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
        resp := s.testRequest(req)
@@ -379,29 +404,14 @@ func (s *FederationSuite) TestGetLocalCollectionByPDH(c *check.C) {
 }
 
 func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
-       srv := &httpserver.Server{
-               Server: http.Server{
-                       Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
-                               w.WriteHeader(404)
-                       }),
-               },
-       }
-
-       c.Assert(srv.Start(), check.IsNil)
-       defer srv.Close()
-
-       np := arvados.NodeProfile{
-               Controller: arvados.SystemServiceInstance{Listen: ":"},
-               RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
-                       TLS: false, Insecure: true}}
-       s.testHandler.Cluster.NodeProfiles["*"] = np
-       s.testHandler.NodeProfile = &np
+       defer s.localServiceReturns404(c).Close()
 
        req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
        resp := s.testRequest(req)
 
        c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+
        var col arvados.Collection
        c.Check(json.NewDecoder(resp.Body).Decode(&col), check.IsNil)
        c.Check(col.PortableDataHash, check.Equals, arvadostest.UserAgreementPDH)
@@ -411,24 +421,7 @@ func (s *FederationSuite) TestGetRemoteCollectionByPDH(c *check.C) {
 }
 
 func (s *FederationSuite) TestGetCollectionByPDHError(c *check.C) {
-       srv := &httpserver.Server{
-               Server: http.Server{
-                       Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
-                               w.WriteHeader(404)
-                       }),
-               },
-       }
-
-       c.Assert(srv.Start(), check.IsNil)
-       defer srv.Close()
-
-       // Make the "local" cluster to a service that always returns 404
-       np := arvados.NodeProfile{
-               Controller: arvados.SystemServiceInstance{Listen: ":"},
-               RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
-                       TLS: false, Insecure: true}}
-       s.testHandler.Cluster.NodeProfiles["*"] = np
-       s.testHandler.NodeProfile = &np
+       defer s.localServiceReturns404(c).Close()
 
        req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
        req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
@@ -440,24 +433,7 @@ func (s *FederationSuite) TestGetCollectionByPDHError(c *check.C) {
 }
 
 func (s *FederationSuite) TestGetCollectionByPDHErrorBadHash(c *check.C) {
-       srv := &httpserver.Server{
-               Server: http.Server{
-                       Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
-                               w.WriteHeader(404)
-                       }),
-               },
-       }
-
-       c.Assert(srv.Start(), check.IsNil)
-       defer srv.Close()
-
-       // Make the "local" cluster to a service that always returns 404
-       np := arvados.NodeProfile{
-               Controller: arvados.SystemServiceInstance{Listen: ":"},
-               RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
-                       TLS: false, Insecure: true}}
-       s.testHandler.Cluster.NodeProfiles["*"] = np
-       s.testHandler.NodeProfile = &np
+       defer s.localServiceReturns404(c).Close()
 
        srv2 := &httpserver.Server{
                Server: http.Server{
@@ -530,3 +506,87 @@ func (s *FederationSuite) TestSaltedTokenGetCollectionByPDHError(c *check.C) {
 
        c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
 }
+
+func (s *FederationSuite) TestGetRemoteContainerRequest(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+       req := httptest.NewRequest("GET", "/arvados/v1/container_requests/"+arvadostest.QueuedContainerRequestUUID, nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cr arvados.ContainerRequest
+       c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
+       c.Check(cr.UUID, check.Equals, arvadostest.QueuedContainerRequestUUID)
+       c.Check(cr.Priority, check.Equals, 1)
+}
+
+func (s *FederationSuite) TestUpdateRemoteContainerRequest(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+       req := httptest.NewRequest("PATCH", "/arvados/v1/container_requests/"+arvadostest.QueuedContainerRequestUUID,
+               strings.NewReader(`{"container_request": {"priority": 696}}`))
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       req.Header.Set("Content-type", "application/json")
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cr arvados.ContainerRequest
+       c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
+       c.Check(cr.UUID, check.Equals, arvadostest.QueuedContainerRequestUUID)
+       c.Check(cr.Priority, check.Equals, 696)
+}
+
+func (s *FederationSuite) TestCreateRemoteContainerRequest1(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+       req := httptest.NewRequest("POST", "/arvados/v1/container_requests",
+               strings.NewReader(`{
+  "cluster_id": "zzzzz",
+  "container_request": {
+    "name": "hello world",
+    "state": "Uncommitted",
+    "output_path": "/",
+    "container_image": "123",
+    "command": ["abc"]
+  }
+}
+`))
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       req.Header.Set("Content-type", "application/json")
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cr arvados.ContainerRequest
+       c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
+       c.Check(cr.Name, check.Equals, "hello world")
+}
+
+func (s *FederationSuite) TestCreateRemoteContainerRequest2(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+       // pass cluster_id via query parameter, this allows arvados-controller
+       // to avoid parsing the body
+       req := httptest.NewRequest("POST", "/arvados/v1/container_requests?cluster_id=zzzzz",
+               strings.NewReader(`{
+  "container_request": {
+    "name": "hello world",
+    "state": "Uncommitted",
+    "output_path": "/",
+    "container_image": "123",
+    "command": ["abc"]
+  }
+}
+`))
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       req.Header.Set("Content-type", "application/json")
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cr arvados.ContainerRequest
+       c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
+       c.Check(cr.Name, check.Equals, "hello world")
+}
+
+func (s *FederationSuite) TestGetRemoteContainer(c *check.C) {
+       defer s.localServiceReturns404(c).Close()
+       req := httptest.NewRequest("GET", "/arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+       resp := s.testRequest(req)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var cn arvados.Container
+       c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+       c.Check(cn.UUID, check.Equals, arvadostest.QueuedContainerUUID)
+}
index 707bc026c02ebc68b5a4567941950e712c5cac7e..373b42e8f4fe285f5c89f114d7d2997220618943 100644 (file)
@@ -82,9 +82,12 @@ func (p *proxy) Do(w http.ResponseWriter,
        }
 
        // make sure original response body gets closed
-       originalBody := resp.Body
-       if originalBody != nil {
-               defer originalBody.Close()
+       var originalBody io.ReadCloser
+       if resp != nil {
+               originalBody = resp.Body
+               if originalBody != nil {
+                       defer originalBody.Close()
+               }
        }
 
        if filter != nil {
index 210ed9981c07292ec3c1508da978eaac351acae7..2622c137030aada559bfc47f5768e7d918b6d816 100644 (file)
@@ -24,6 +24,40 @@ type Container struct {
        SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
 }
 
+// Container is an arvados#container resource.
+type ContainerRequest struct {
+       UUID                    string                 `json:"uuid"`
+       OwnerUUID               string                 `json:"owner_uuid"`
+       CreatedAt               time.Time              `json:"created_at"`
+       ModifiedByClientUUID    string                 `json:"modified_by_client_uuid"`
+       ModifiedByUserUUID      string                 `json:"modified_by_user_uuid"`
+       ModifiedAt              time.Time              `json:"modified_at"`
+       Href                    string                 `json:"href"`
+       Kind                    string                 `json:"kind"`
+       Etag                    string                 `json:"etag"`
+       Name                    string                 `json:"name"`
+       Description             string                 `json:"description"`
+       Properties              map[string]interface{} `json:"properties"`
+       State                   ContainerRequestState  `json:"state"`
+       RequestingContainerUUID string                 `json:"requesting_container_uuid"`
+       ContainerUUID           string                 `json:"container_uuid"`
+       ContainerCountMax       int                    `json:"container_count_max"`
+       Mounts                  map[string]Mount       `json:"mounts"`
+       RuntimeConstraints      RuntimeConstraints     `json:"runtime_constraints"`
+       SchedulingParameters    SchedulingParameters   `json:"scheduling_parameters"`
+       ContainerImage          string                 `json:"container_image"`
+       Environment             map[string]string      `json:"environment"`
+       Cwd                     string                 `json:"cwd"`
+       Command                 []string               `json:"command"`
+       OutputPath              string                 `json:"output_path"`
+       OutputName              string                 `json:"output_name"`
+       OutputTTL               int                    `json:"output_ttl"`
+       Priority                int                    `json:"priority"`
+       UseExisting             bool                   `json:"use_existing"`
+       LogUUID                 string                 `json:"log_uuid"`
+       OutputUUID              string                 `json:"output_uuid"`
+}
+
 // Mount is special behavior to attach to a filesystem path or device.
 type Mount struct {
        Kind              string      `json:"kind"`
@@ -75,3 +109,12 @@ const (
        ContainerStateComplete  = ContainerState("Complete")
        ContainerStateCancelled = ContainerState("Cancelled")
 )
+
+// ContainerState is a string corresponding to a valid Container state.
+type ContainerRequestState string
+
+const (
+       ContainerRequestStateUncomitted = ContainerState("Uncommitted")
+       ContainerRequestStateCommitted  = ContainerState("Committed")
+       ContainerRequestStateFinal      = ContainerState("Final")
+)
index e6984601f40819759ee43c8dc463afa2655f64eb..eb79b5b7d208a4d16e7cb8c660efc54c1d2142a7 100644 (file)
@@ -36,7 +36,8 @@ const (
        Dispatch1Token    = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
        Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
 
-       QueuedContainerUUID = "zzzzz-dz642-queuedcontainer"
+       QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
+       QueuedContainerUUID        = "zzzzz-dz642-queuedcontainer"
 
        ArvadosRepoUUID = "zzzzz-s0uqq-arvadosrepo0123"
        ArvadosRepoName = "arvados"