Merge branch '12278-cwl-debug-flag' closes #12278
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 29 Sep 2017 20:33:12 +0000 (16:33 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 29 Sep 2017 20:33:19 +0000 (16:33 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

13 files changed:
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/submit_wf_no_reuse.cwl [new file with mode: 0644]
sdk/go/crunchrunner/upload.go
sdk/go/crunchrunner/upload_test.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunch-run/upload.go
services/crunch-run/upload_test.go
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/tests/test_daemon.py

index 78086ed0cc07c4c75258d439cd0e978f12f828e9..e8e2a5113195174a45dea36e13f3f5bb78b1fc5d 100644 (file)
@@ -316,6 +316,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * self.submit_runner_ram,
                 "API": True
             },
+            "use_existing": self.enable_reuse,
             "properties": {}
         }
 
index 7fc62db9513e1793021b26b9d8d52ba405dada1f..25f64ea23065f887517c2ddba5ac728f18e856b6 100644 (file)
@@ -184,17 +184,19 @@ class ArvadosJob(object):
         if self.arvrunner.pipeline:
             self.arvrunner.pipeline["components"][self.name] = {"job": record}
             with Perf(metrics, "update_pipeline_component %s" % self.name):
-                self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
-                                                                                 body={
-                                                                                    "components": self.arvrunner.pipeline["components"]
-                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
+                self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+                    uuid=self.arvrunner.pipeline["uuid"],
+                    body={
+                        "components": self.arvrunner.pipeline["components"]
+                    }).execute(num_retries=self.arvrunner.num_retries)
         if self.arvrunner.uuid:
             try:
                 job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
                 if job:
                     components = job["components"]
                     components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
+                    self.arvrunner.api.jobs().update(
+                        uuid=self.arvrunner.uuid,
                         body={
                             "components": components
                         }).execute(num_retries=self.arvrunner.num_retries)
@@ -328,12 +330,20 @@ class RunnerJob(Runner):
 
         del job_spec["owner_uuid"]
         job_spec["job"] = job
+
+        instance_spec = {
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": self.name,
+            "components": {
+                "cwl-runner": job_spec,
+            },
+            "state": "RunningOnServer",
+        }
+        if not self.enable_reuse:
+            instance_spec["properties"] = {"run_options": {"enable_job_reuse": False}}
+
         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
-            body={
-                "owner_uuid": self.arvrunner.project_uuid,
-                "name": self.name,
-                "components": {"cwl-runner": job_spec },
-                "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+            body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
         logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
 
         if kwargs.get("wait") is False:
index 39497098498792d1a4bb8595a77f6dcf9d1f6d47..c55e976924f65bd362153a0921152e5a170ca47b 100644 (file)
@@ -293,6 +293,12 @@ class Runner(object):
         self.tool = tool
         self.job_order = job_order
         self.running = False
+        if enable_reuse:
+            # If reuse is permitted by command line arguments but
+            # disabled by the workflow itself, disable it.
+            reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
+            if reuse_req:
+                enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
         self.uuid = None
         self.final_output = None
index 03147206e467cb85311eb8641dbe4efc4e570f5e..059b47275c9207279dc3aa8e0933980268e70512 100644 (file)
@@ -245,7 +245,8 @@ def stubs(func):
                 'vcpus': 1,
                 'ram': 1024*1024*1024
             },
-            "properties": {}
+            'use_existing': True,
+            'properties': {}
         }
 
         stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
@@ -320,6 +321,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
         expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
+        expect_pipeline["properties"] = {"run_options": {"enable_job_reuse": False}}
 
         stubs.api.pipeline_instances().create.assert_called_with(
             body=JsonDiffMatcher(expect_pipeline))
@@ -495,9 +497,41 @@ class TestSubmit(unittest.TestCase):
             logging.exception("")
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
-        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  '--disable-reuse', '--on-error=continue',
-                                                  '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+        expect_container["command"] = [
+            'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+            '--disable-reuse', '--on-error=continue',
+            '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+        expect_container["use_existing"] = False
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
+    @stubs
+    def test_submit_container_reuse_disabled_by_workflow(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+             "tests/wf/submit_wf_no_reuse.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+        self.assertEqual(exited, 0)
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["command"] = [
+            'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+            '--disable-reuse', '--on-error=continue',
+            '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+        expect_container["use_existing"] = False
+        expect_container["name"] = "submit_wf_no_reuse.cwl"
+        expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
+            {
+                "class": "http://arvados.org/cwl#ReuseRequirement",
+                "enableReuse": False,
+            },
+        ]
 
         stubs.api.container_requests().create.assert_called_with(
             body=JsonDiffMatcher(expect_container))
@@ -705,7 +739,8 @@ class TestSubmit(unittest.TestCase):
                 'vcpus': 1,
                 'ram': 1073741824
             },
-            "properties": {}
+            'use_existing': True,
+            'properties': {}
         }
 
         stubs.api.container_requests().create.assert_called_with(
@@ -820,7 +855,8 @@ class TestSubmit(unittest.TestCase):
                 'vcpus': 1,
                 'ram': 1073741824
             },
-            "properties": {
+            'use_existing': True,
+            'properties': {
                 "template_uuid": "962eh-7fd4e-gkbzl62qqtfig37"
             }
         }
diff --git a/sdk/cwl/tests/wf/submit_wf_no_reuse.cwl b/sdk/cwl/tests/wf/submit_wf_no_reuse.cwl
new file mode 100644 (file)
index 0000000..636b850
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+# Test case for arvados-cwl-runner. Disables job/container reuse.
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
+inputs:
+  - id: x
+    type: File
+  - id: y
+    type: Directory
+  - id: z
+    type: Directory
+outputs: []
+steps:
+  - id: step1
+    in:
+      - { id: x, source: "#x" }
+    out: []
+    run: ../tool/submit_tool.cwl
+hints:
+  arv:ReuseRequirement:
+    enableReuse: false
index fd24908ad6a7e9e49721c0ed33980ce096c7e34a..2848d1087c52684fe9ca5a79ec1f6f5c87b28b95 100644 (file)
@@ -9,14 +9,15 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "sort"
        "strings"
+
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 type Block struct {
@@ -90,7 +91,26 @@ type ManifestWriter struct {
 }
 
 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
-       if info.IsDir() {
+       if err != nil {
+               return err
+       }
+
+       targetPath, targetInfo := path, info
+       if info.Mode()&os.ModeSymlink != 0 {
+               // Update targetpath/info to reflect the symlink
+               // target, not the symlink itself
+               targetPath, err = filepath.EvalSymlinks(path)
+               if err != nil {
+                       return err
+               }
+               targetInfo, err = os.Stat(targetPath)
+               if err != nil {
+                       return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+               }
+       }
+
+       if targetInfo.Mode()&os.ModeType != 0 {
+               // Skip directories, pipes, other non-regular files
                return nil
        }
 
index ceb89dc26d767721ea65887a6789fcf8bb0d8983..5bc749258dea922bcc89f07d00b454ba89f05f72 100644 (file)
@@ -8,9 +8,11 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       . "gopkg.in/check.v1"
        "io/ioutil"
        "os"
+       "syscall"
+
+       . "gopkg.in/check.v1"
 )
 
 type UploadTestSuite struct{}
@@ -38,18 +40,24 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
        c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
 }
 
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreeFiles(c *C) {
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
        }()
 
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+       for _, err := range []error{
+               ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+               ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+               os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+               syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+       } {
+               c.Assert(err, IsNil)
+       }
 
        str, err := WriteTree(KeepTestClient{}, tmpdir)
        c.Check(err, IsNil)
-       c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+       c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
 }
 
 func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
index 3fdd37402fada76813dc22e69251f6144da6e358..8af12100dbe91d0cab45dca171d65b8b915f9229 100644 (file)
@@ -181,9 +181,9 @@ type ContainerRunner struct {
        networkMode   string // passed through to HostConfig.NetworkMode
 }
 
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
 // Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
        runner.SigChan = make(chan os.Signal, 1)
        signal.Notify(runner.SigChan, syscall.SIGTERM)
        signal.Notify(runner.SigChan, syscall.SIGINT)
@@ -192,7 +192,6 @@ func (runner *ContainerRunner) SetupSignals() {
        go func(sig chan os.Signal) {
                <-sig
                runner.stop()
-               signal.Stop(sig)
        }(runner.SigChan)
 }
 
@@ -213,6 +212,13 @@ func (runner *ContainerRunner) stop() {
        }
 }
 
+func (runner *ContainerRunner) teardown() {
+       if runner.SigChan != nil {
+               signal.Stop(runner.SigChan)
+               close(runner.SigChan)
+       }
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -1303,6 +1309,8 @@ func (runner *ContainerRunner) Run() (err error) {
                // a new one in case we needed to log anything while
                // finalizing.
                runner.CrunchLog.Close()
+
+               runner.teardown()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1311,7 +1319,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        // setup signal handling
-       runner.SetupSignals()
+       runner.setupSignals()
 
        // check for and/or load image
        err = runner.LoadImage()
index 935c61a1100f6400af82b39e57b0a28c1b1ae41b..474ba5d7db7b85de773443736f39dd6914bc94d2 100644 (file)
@@ -99,7 +99,7 @@ func NewTestDockerClient(exitCode int) *TestDockerClient {
        t := &TestDockerClient{}
        t.logReader, t.logWriter = io.Pipe()
        t.finish = exitCode
-       t.stop = make(chan bool)
+       t.stop = make(chan bool, 1)
        t.cwd = "/"
        return t
 }
index fa74eb05572ce25623b197419999310b9f402401..bb2776a4266342568a3eb05c755ed64d700659ec 100644 (file)
@@ -18,14 +18,15 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream
@@ -265,8 +266,26 @@ type WalkUpload struct {
 // WalkFunc walks a directory tree, uploads each file found and adds it to the
 // CollectionWriter.
 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+       if err != nil {
+               return err
+       }
+
+       targetPath, targetInfo := path, info
+       if info.Mode()&os.ModeSymlink != 0 {
+               // Update targetpath/info to reflect the symlink
+               // target, not the symlink itself
+               targetPath, err = filepath.EvalSymlinks(path)
+               if err != nil {
+                       return err
+               }
+               targetInfo, err = os.Stat(targetPath)
+               if err != nil {
+                       return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+               }
+       }
 
-       if info.IsDir() {
+       if targetInfo.Mode()&os.ModeType != 0 {
+               // Skip directories, pipes, other non-regular files
                return nil
        }
 
index 86ad1b32ae074b3acfd577e164c174dd099099cd..96ea2b119094f37b36c5711280abe2d975171d4e 100644 (file)
@@ -5,11 +5,13 @@
 package main
 
 import (
-       . "gopkg.in/check.v1"
        "io/ioutil"
        "log"
        "os"
        "sync"
+       "syscall"
+
+       . "gopkg.in/check.v1"
 )
 
 type UploadTestSuite struct{}
@@ -31,20 +33,26 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
        c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
 }
 
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
        }()
 
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+       for _, err := range []error{
+               ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+               ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+               os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+               syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+       } {
+               c.Assert(err, IsNil)
+       }
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
-       c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+       c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
 }
 
 func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
index e476e5e3e21c07205144deb6226e54c9762f8c58..d8087a1dc1cf0d7472acc1c19e429dd36c72dacb 100644 (file)
@@ -498,8 +498,19 @@ class NodeManagerDaemonActor(actor_class):
         except pykka.ActorDeadError:
             return
         cloud_node_id = cloud_node.id
-        record = self.cloud_nodes[cloud_node_id]
-        shutdown_actor.stop()
+
+        try:
+            shutdown_actor.stop()
+        except pykka.ActorDeadError:
+            pass
+
+        try:
+            record = self.cloud_nodes[cloud_node_id]
+        except KeyError:
+            # Cloud node was already removed from the cloud node list
+            # supposedly while the destroy_node call was finishing its
+            # job.
+            return
         record.shutdown_actor = None
 
         if not success:
index 1efa1ffeb35199c251d13e217f2cb37c146c4622..d6820803a6524c69a1de608aa2b5bcc8b02848b7 100644 (file)
@@ -77,8 +77,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.arv_factory = mock.MagicMock(name='arvados_mock')
         api_client = mock.MagicMock(name='api_client')
-        api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
-                                                           testutil.arvados_node_mock(2)]
+        api_client.nodes().create().execute.side_effect = \
+            [testutil.arvados_node_mock(1),
+             testutil.arvados_node_mock(2)]
         self.arv_factory.return_value = api_client
 
         self.cloud_factory = mock.MagicMock(name='cloud_mock')
@@ -400,6 +401,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_setup.start.called,
                         "second node not started after booted node stopped")
 
+    def test_node_disappearing_during_shutdown(self):
+        cloud_node = testutil.cloud_node_mock(6)
+        setup = self.start_node_boot(cloud_node, id_num=6)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.update_server_wishlist([])
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertShutdownCancellable(True)
+        shutdown = self.node_shutdown.start().proxy()
+        shutdown.cloud_node.get.return_value = cloud_node
+        # Simulate a successful but slow node destroy call: the cloud node
+        # list gets updated before the ShutdownActor finishes.
+        record = self.daemon.cloud_nodes.get().nodes.values()[0]
+        self.assertTrue(record.shutdown_actor is not None)
+        self.daemon.cloud_nodes.get().nodes.clear()
+        self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+        self.assertTrue(
+            record.shutdown_actor is not None,
+            "test was ineffective -- failed to simulate the race condition")
+
     def test_booted_node_shut_down_when_never_listed(self):
         setup = self.start_node_boot()
         self.cloud_factory().node_start_time.return_value = time.time() - 3601