self.output_name = output_name
self.output_tags = output_tags
self.project_uuid = None
+ self.intermediate_output_ttl = 0
+ self.intermediate_output_collections = []
+ self.trash_intermediate = False
if keep_client is not None:
self.keep_client = keep_client
kwargs["fetcher_constructor"] = partial(CollectionFetcher,
api_client=self.api,
fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ overrides=kwargs.get("override_tools"))
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
def add_uploaded(self, src, pair):
self.uploaded[src] = pair
+ def add_intermediate_output(self, uuid):
+ if uuid:
+ self.intermediate_output_collections.append(uuid)
+
+ def trash_intermediate_output(self):
+ logger.info("Cleaning up intermediate output collections")
+ for i in self.intermediate_output_collections:
+ try:
+ self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
+ except:
+ logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ break
+
def check_features(self, obj):
if isinstance(obj, dict):
if obj.get("writable"):
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
+
+ self.trash_intermediate = kwargs["trash_intermediate"]
+ if self.trash_intermediate and self.work_api != "containers":
+ raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+ self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+ if self.intermediate_output_ttl and self.work_api != "containers":
+ raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+ if self.intermediate_output_ttl < 0:
+ raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
+
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- upload_workflow_deps(self, tool)
+ override_tools = {}
+ upload_workflow_deps(self, tool, override_tools)
# Reload tool object which may have been updated by
# upload_workflow_deps
makeTool=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
- metadata=tool.metadata)
+ metadata=tool.metadata,
+ override_tools=override_tools)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % kwargs["name"],
submit_runner_ram=kwargs.get("submit_runner_ram"),
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"))
+ submit_runner_image=kwargs.get("submit_runner_image"),
+ intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
self.output_name,
adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+ if self.trash_intermediate and self.final_status == "success":
+ self.trash_intermediate_output()
+
return (self.final_output, self.final_status)
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
default=True, dest="enable_reuse",
- help="")
+ help="Enable job or container reuse (default)")
exgroup.add_argument("--disable-reuse", action="store_false",
default=True, dest="enable_reuse",
- help="")
+ help="Disable job or container reuse")
parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
parser.add_argument("--api", type=str,
default=None, dest="work_api",
- help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
+ choices=("jobs", "containers"),
+ help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
parser.add_argument("--compute-checksum", action="store_true", default=False,
help="Compute checksum of contents while collecting outputs",
help="Enable loading and running development versions "
"of CWL spec.", default=False)
+ parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
+ help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
+ default=0)
+
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--trash-intermediate", action="store_true",
+ default=False, dest="trash_intermediate",
+ help="Immediately trash intermediate outputs on workflow success.")
+ exgroup.add_argument("--no-trash-intermediate", action="store_false",
+ default=False, dest="trash_intermediate",
+ help="Do not trash intermediate outputs (default).")
+
parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
"http://arvados.org/cwl#RuntimeConstraints",
"http://arvados.org/cwl#PartitionRequirement",
"http://arvados.org/cwl#APIRequirement",
- "http://commonwl.org/cwltool#LoadListingRequirement"
+ "http://commonwl.org/cwltool#LoadListingRequirement",
+ "http://arvados.org/cwl#IntermediateOutput"
])
def main(args, stdout, stderr, api_client=None, keep_client=None):
stubs.events = events
stubs.keepdocker = keepdocker
-
def putstub(p, **kwargs):
return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
keep_client1().put.side_effect = putstub
"uuid": stubs.fake_user_uuid,
}
stubs.api.collections().list().execute.return_value = {"items": []}
- stubs.api.collections().create().execute.side_effect = ({
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
- "portable_data_hash": "99999999999999999999999999999991+99",
- "manifest_text": ""
- }, {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
- "portable_data_hash": "99999999999999999999999999999992+99",
- "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"
- },
- {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4",
- "portable_data_hash": "99999999999999999999999999999994+99",
- "manifest_text": ""
- },
- {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz5",
- "portable_data_hash": "99999999999999999999999999999995+99",
- "manifest_text": ""
- },
- {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz6",
- "portable_data_hash": "99999999999999999999999999999996+99",
- "manifest_text": ""
- }
- )
- stubs.api.collections().get().execute.return_value = {
- "portable_data_hash": "99999999999999999999999999999993+99", "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"}
+
+ class CollectionExecute(object):
+ def __init__(self, exe):
+ self.exe = exe
+ def execute(self, num_retries=None):
+ return self.exe
+
+ def collection_createstub(created_collections, body, ensure_unique_name=None):
+ mt = body["manifest_text"]
+ uuid = "zzzzz-4zz18-zzzzzzzzzzzzzz%d" % len(created_collections)
+ pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt))
+ created_collections[uuid] = {
+ "uuid": uuid,
+ "portable_data_hash": pdh,
+ "manifest_text": mt
+ }
+ return CollectionExecute(created_collections[uuid])
+
+ def collection_getstub(created_collections, uuid):
+ for v in created_collections.itervalues():
+ if uuid in (v["uuid"], v["portable_data_hash"]):
+ return CollectionExecute(v)
+
+ created_collections = {}
+ stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
+ stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
stubs.api.jobs().create().execute.return_value = {
'script_parameters': {
'x': {
'basename': 'blorp.txt',
- 'location': 'keep:99999999999999999999999999999992+99/blorp.txt',
+ 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
'class': 'File'
},
'y': {
}],
'class': 'Directory'
},
- 'cwl:tool':
- '99999999999999999999999999999994+99/workflow.cwl#main'
+ 'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main'
},
'repository': 'arvados',
'script_version': 'master',
'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
'script_parameters': {
'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999992+99/blorp.txt'}},
+ 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'}},
'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': '99999999999999999999999999999994+99/workflow.cwl#main',
+ 'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
'kind': 'json',
'content': {
'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
- 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999992+99/blorp.txt'},
+ 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'},
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}
self.assertEqual(exited, 0)
stubs.api.collections().create.assert_has_calls([
- mock.call(),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
'name': 'submit_tool.cwl dependencies',
}), ensure_unique_name=True),
- mock.call().execute(num_retries=4),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
'name': 'submit_wf.cwl input',
}), ensure_unique_name=True),
- mock.call().execute(num_retries=4)])
+ mock.call(body=JsonDiffMatcher({
+ 'manifest_text':
+ '. 61df2ed9ee3eb7dd9b799e5ca35305fa+1217 0:1217:workflow.cwl\n',
+ 'replication_desired': None,
+ 'name': 'submit_wf.cwl',
+ }), ensure_unique_name=True) ])
arvdock.assert_has_calls([
mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
logging.exception("")
stubs.api.collections().create.assert_has_calls([
- mock.call(),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
'name': 'submit_tool.cwl dependencies',
}), ensure_unique_name=True),
- mock.call().execute(num_retries=4),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
'name': 'submit_wf.cwl input',
- }), ensure_unique_name=True),
- mock.call().execute(num_retries=4)])
+ }), ensure_unique_name=True)])
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+
+ @stubs
+ def test_submit_container_output_ttl(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--intermediate-output-ttl", "3600",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue',
+ "--intermediate-output-ttl=3600",
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @stubs
+ def test_submit_container_trash_intermediate(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--trash-intermediate",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ '--enable-reuse', '--on-error=continue',
+ "--trash-intermediate",
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
@stubs
def test_submit_container_output_tags(self, stubs):
output_tags = "tag0,tag1,tag2"
'dataclass': 'File',
'required': True,
'type': 'File',
- 'value': '99999999999999999999999999999992+99/blorp.txt',
+ 'value': '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
}
expect_component['script_parameters']['y'] = {
'dataclass': 'Collection',
self.existing_workflow_uuid + '\n')
+ @stubs
+ def test_create_collection_per_tool(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--create-workflow", "--debug",
+ "--api=containers",
+ "--project-uuid", project_uuid,
+ "tests/collection_per_tool/collection_per_tool.cwl"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_workflow = open("tests/collection_per_tool/collection_per_tool_packed.cwl").read()
+
+ body = {
+ "workflow": {
+ "owner_uuid": project_uuid,
+ "name": "collection_per_tool.cwl",
+ "description": "",
+ "definition": expect_workflow,
+ }
+ }
+ stubs.api.workflows().create.assert_called_with(
+ body=JsonDiffMatcher(body))
+
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_workflow_uuid + '\n')
+
class TestTemplateInputs(unittest.TestCase):
expect_template = {
"components": {
},
'script_parameters': {
'cwl:tool':
- '99999999999999999999999999999992+99/workflow.cwl#main',
+ '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
expect_template = copy.deepcopy(self.expect_template)
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
- params["cwl:tool"] = '99999999999999999999999999999994+99/workflow.cwl#main'
+ params["fileInput"]["value"] = '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'
+ params["cwl:tool"] = '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main'
params["floatInput"]["value"] = 1.234
params["boolInput"]["value"] = True