kwargs["enable_reuse"] = kwargs.get("enable_reuse")
kwargs["use_container"] = True
kwargs["tmpdir_prefix"] = "tmp"
- kwargs["on_error"] = "continue"
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
if not kwargs["name"]:
else:
runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"))
+ name=kwargs.get("name"), on_error=kwargs.get("on_error"))
else:
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"))
+ name=kwargs.get("name"), on_error=kwargs.get("on_error"))
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
# Create pipeline for local run
help="Name to use for workflow execution instance.",
default=None)
+ parser.add_argument("--on-error", type=str,
+ help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
+ "Default is 'continue'.", default="continue", choices=("stop", "continue"))
+
parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
- 'arv:enable_reuse': True
+ 'arv:enable_reuse': True,
+ 'arv:on_error': 'continue'
},
'repository': 'arvados',
'script_version': 'master',
},
'state': 'Committed',
'owner_uuid': None,
- 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue',
+ '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'output_path': '/var/spool/cwl',
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_on_error(self, stubs, tm):
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--debug", "--on-error=stop",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=JsonDiffMatcher(expect_pipeline))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_pipeline_uuid + '\n')
+
@mock.patch("time.sleep")
@stubs
logging.exception("")
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '--disable-reuse', '--on-error=continue',
+ '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
+ @stubs
+ def test_submit_container_on_error(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--on-error=stop",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=stop',
+ '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
logging.exception("")
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-name="+output_name, '--enable-reuse',
+ "--output-name="+output_name, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
logging.exception("")
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-tags="+output_tags, '--enable-reuse',
+ "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
'name': 'expect_arvworkflow.cwl#main',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
+ '--enable-reuse', '--on-error=continue',
+ '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'API': True,
'name': 'a test workflow',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
+ '--enable-reuse', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'API': True,