10793: Bump cwltool pin for workflow engine improvements. Propagate on-error
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 5 Jan 2017 14:59:53 +0000 (09:59 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 5 Jan 2017 14:59:53 +0000 (09:59 -0500)
behavior from command line to workflow runner job.

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_submit.py

index 6f320886453c0bff25d681627683b55adab78a6a..850aa880ecbbd131b749f92e2f16686b98460a17 100644 (file)
@@ -358,7 +358,6 @@ class ArvCwlRunner(object):
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
         kwargs["use_container"] = True
         kwargs["tmpdir_prefix"] = "tmp"
-        kwargs["on_error"] = "continue"
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if not kwargs["name"]:
@@ -387,11 +386,11 @@ class ArvCwlRunner(object):
                 else:
                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                                name=kwargs.get("name"))
+                                                name=kwargs.get("name"), on_error=kwargs.get("on_error"))
             else:
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs.get("name"))
+                                      name=kwargs.get("name"), on_error=kwargs.get("on_error"))
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
@@ -581,6 +580,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Name to use for workflow execution instance.",
                         default=None)
 
+    parser.add_argument("--on-error", type=str,
+                        help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
+                        "Default is 'continue'.", default="continue", choices=("stop", "continue"))
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
index d6e3098ab4800fa8bab4d735046cefe280b5f9bb..30c163d20ce6d0e71cd644816cd0beb7db7d77de 100644 (file)
@@ -260,6 +260,9 @@ class RunnerContainer(Runner):
         else:
             command.append("--disable-reuse")
 
+        if self.on_error:
+            command.append("--on-error=" + self.on_error)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index 780d6bd5ac86ac3c51e4089e8a741cdc8a9efc2b..08374a46e6e3efe3d3e53e8665b919da40c58df5 100644 (file)
@@ -252,6 +252,9 @@ class RunnerJob(Runner):
 
         self.job_order["arv:enable_reuse"] = self.enable_reuse
 
+        if self.on_error:
+            self.job_order["arv:on_error"] = self.on_error
+
         return {
             "script": "cwl-runner",
             "script_version": "master",
index 4b1b42843b67ae8ee2efb372186f54043355aa1d..71c65bca88c17e1196b7f9280646be7b83bea500 100644 (file)
@@ -79,6 +79,10 @@ def run():
             enable_reuse = job_order_object["arv:enable_reuse"]
             del job_order_object["arv:enable_reuse"]
 
+        if "arv:on_error" in job_order_object:
+            on_error = job_order_object["arv:on_error"]
+            del job_order_object["arv:on_error"]
+
         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
                                           output_name=output_name, output_tags=output_tags)
 
@@ -87,6 +91,7 @@ def run():
         args = argparse.Namespace()
         args.project_uuid = arvados.current_job()["owner_uuid"]
         args.enable_reuse = enable_reuse
+        args.on_error = on_error
         args.submit = False
         args.debug = False
         args.quiet = False
index 144fd53ffb0324a12061c03e318c4a408844bf6c..1c3625e26bb1345673b31af5377d7b9d5282a10b 100644 (file)
@@ -167,7 +167,7 @@ def arvados_jobs_image(arvrunner):
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
-                 name=None):
+                 name=None, on_error=None):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
@@ -178,6 +178,7 @@ class Runner(object):
         self.output_name = output_name
         self.output_tags = output_tags
         self.name = name
+        self.on_error = on_error
 
         if submit_runner_ram:
             self.submit_runner_ram = submit_runner_ram
index 4d7caf0aca0b68a521d442f79b173a57084945d6..61b8f0b952773064c03c18583e26e4aaf4146dd4 100644 (file)
@@ -48,7 +48,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20161227200419',
+          'cwltool==1.0.20170105144051',
           'schema-salad==2.1.20161227191302',
           'ruamel.yaml==0.13.7',
           'arvados-python-client>=0.1.20160826210445',
index cdeb6dfd8edabe816565a081784a564f32510fcc..0ee90c8c74befff6aea223b8cc6c1f4cbaf8909c 100644 (file)
@@ -145,7 +145,8 @@ def stubs(func):
                                   {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
                               ]}},
                         'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
-                        'arv:enable_reuse': True
+                        'arv:enable_reuse': True,
+                        'arv:on_error': 'continue'
                     },
                     'repository': 'arvados',
                     'script_version': 'master',
@@ -195,7 +196,9 @@ def stubs(func):
             },
             'state': 'Committed',
             'owner_uuid': None,
-            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                        '--enable-reuse', '--on-error=continue',
+                        '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'output_path': '/var/spool/cwl',
@@ -285,6 +288,24 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_pipeline_uuid + '\n')
 
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_on_error(self, stubs, tm):
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--debug", "--on-error=stop",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
+
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=JsonDiffMatcher(expect_pipeline))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_pipeline_uuid + '\n')
+
 
     @mock.patch("time.sleep")
     @stubs
@@ -447,7 +468,31 @@ class TestSubmit(unittest.TestCase):
             logging.exception("")
 
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+                                                  '--disable-reuse', '--on-error=continue',
+                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
+    @stubs
+    def test_submit_container_on_error(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--on-error=stop",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                                  '--enable-reuse', '--on-error=stop',
+                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -470,7 +515,7 @@ class TestSubmit(unittest.TestCase):
             logging.exception("")
 
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  "--output-name="+output_name, '--enable-reuse',
+                                                  "--output-name="+output_name, '--enable-reuse', '--on-error=continue',
                                                   '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -494,7 +539,7 @@ class TestSubmit(unittest.TestCase):
             logging.exception("")
 
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  "--output-tags="+output_tags, '--enable-reuse',
+                                                  "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
                                                   '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -577,7 +622,8 @@ class TestSubmit(unittest.TestCase):
             'name': 'expect_arvworkflow.cwl#main',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
+                        '--enable-reuse', '--on-error=continue',
+                        '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,
@@ -665,7 +711,8 @@ class TestSubmit(unittest.TestCase):
             'name': 'a test workflow',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
+                        '--enable-reuse', '--on-error=continue',
+                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,