From: Peter Amstutz Date: Fri, 26 May 2017 18:21:01 +0000 (-0400) Subject: Merge branch 'master' into 11100-cwl-set-output-ttl X-Git-Tag: 1.1.0~225^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/4dac499d30988fd8289a40ad4128d6d471037180?hp=-c Merge branch 'master' into 11100-cwl-set-output-ttl --- 4dac499d30988fd8289a40ad4128d6d471037180 diff --combined sdk/cwl/arvados_cwl/__init__.py index fbe91b9f39,25a240ca28..43082dfb85 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@@ -74,9 -74,6 +74,9 @@@ class ArvCwlRunner(object) self.output_name = output_name self.output_tags = output_tags self.project_uuid = None + self.intermediate_output_ttl = 0 + self.intermediate_output_collections = [] + self.trash_intermediate = False if keep_client is not None: self.keep_client = keep_client @@@ -108,7 -105,8 +108,8 @@@ kwargs["fetcher_constructor"] = partial(CollectionFetcher, api_client=self.api, fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), - num_retries=self.num_retries) + num_retries=self.num_retries, + overrides=kwargs.get("override_tools")) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": @@@ -205,20 -203,6 +206,20 @@@ def add_uploaded(self, src, pair): self.uploaded[src] = pair + def add_intermediate_output(self, uuid): + if uuid: + self.intermediate_output_collections.append(uuid) + + def trash_intermediate_output(self): + logger.info("Cleaning up intermediate output collections") + for i in self.intermediate_output_collections: + try: + self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries) + except: + logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) + if sys.exc_info()[0] is KeyboardInterrupt: + break + def check_features(self, obj): if isinstance(obj, dict): if obj.get("writable"): @@@ -346,23 -330,13 +347,24 @@@ collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) + + self.trash_intermediate = kwargs["trash_intermediate"] + if self.trash_intermediate and self.work_api != "containers": + raise Exception("--trash-intermediate is only supported with --api=containers.") + + self.intermediate_output_ttl = kwargs["intermediate_output_ttl"] + if self.intermediate_output_ttl and self.work_api != "containers": + raise Exception("--intermediate-output-ttl is only supported with --api=containers.") + if self.intermediate_output_ttl < 0: + raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) + if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - upload_workflow_deps(self, tool) + override_tools = {} + upload_workflow_deps(self, tool, override_tools) # Reload tool object which may have been updated by # upload_workflow_deps @@@ -370,7 -344,8 +372,8 @@@ makeTool=self.arv_make_tool, loader=tool.doc_loader, avsc_names=tool.doc_schema, - metadata=tool.metadata) + metadata=tool.metadata, + override_tools=override_tools) # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % kwargs["name"], @@@ -436,8 -411,7 +439,8 @@@ submit_runner_ram=kwargs.get("submit_runner_ram"), name=kwargs.get("name"), on_error=kwargs.get("on_error"), - submit_runner_image=kwargs.get("submit_runner_image")) + submit_runner_image=kwargs.get("submit_runner_image"), + intermediate_output_ttl=kwargs.get("intermediate_output_ttl")) elif self.work_api == "jobs": runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, @@@ -541,9 -515,6 +544,9 @@@ adjustDirObjs(self.final_output, partial(get_listing, self.fs_access)) adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) + if self.trash_intermediate and self.final_status == "success": + self.trash_intermediate_output() + return (self.final_output, self.final_status) @@@ -590,10 -561,10 +593,10 @@@ def arg_parser(): # type: () -> argpar exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", default=True, dest="enable_reuse", - help="") + help="Enable job or container reuse (default)") exgroup.add_argument("--disable-reuse", action="store_false", default=True, dest="enable_reuse", - help="") + help="Disable job or container reuse") parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.") parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None) @@@ -626,8 -597,7 +629,8 @@@ parser.add_argument("--api", type=str, default=None, dest="work_api", - help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.") + choices=("jobs", "containers"), + help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.") parser.add_argument("--compute-checksum", action="store_true", default=False, help="Compute checksum of contents while collecting outputs", @@@ -653,18 -623,6 +656,18 @@@ help="Enable loading and running development versions " "of CWL spec.", default=False) + parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", + help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).", + default=0) + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--trash-intermediate", action="store_true", + default=False, dest="trash_intermediate", + help="Immediately trash intermediate outputs on workflow success.") + exgroup.add_argument("--no-trash-intermediate", action="store_false", + default=False, dest="trash_intermediate", + help="Do not trash intermediate outputs (default).") + parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") @@@ -682,8 -640,7 +685,8 @@@ def add_arv_hints() "http://arvados.org/cwl#RuntimeConstraints", "http://arvados.org/cwl#PartitionRequirement", "http://arvados.org/cwl#APIRequirement", - "http://commonwl.org/cwltool#LoadListingRequirement" + "http://commonwl.org/cwltool#LoadListingRequirement", + "http://arvados.org/cwl#IntermediateOutput" ]) def main(args, stdout, stderr, api_client=None, keep_client=None): diff --combined sdk/cwl/arvados_cwl/arv-cwl-schema.yml index b45378d6fe,eb26e9ef00..6838c0f1ae --- a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml +++ b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml @@@ -94,6 -94,12 +94,12 @@@ $graph doc: | Select preferred compute partitions on which to run jobs. fields: + - name: class + type: string + doc: "Always 'arv:PartitionRequirement'" + jsonldPredicate: + _id: "@type" + _type: "@vocab" - name: partition type: - string @@@ -114,32 -120,3 +120,32 @@@ jsonldPredicate: _id: "@type" _type: "@vocab" + +- name: IntermediateOutput + type: record + extends: cwl:ProcessRequirement + inVocab: false + doc: | + Specify desired handling of intermediate output collections. + fields: + class: + type: string + doc: "Always 'arv:IntermediateOutput'" + jsonldPredicate: + _id: "@type" + _type: "@vocab" + outputTTL: + type: int + doc: | + If the value is greater than zero, consider intermediate output + collections to be temporary and should be automatically + trashed. Temporary collections will be trashed `outputTTL` seconds + after creation. A value of zero means intermediate output should be + retained indefinitely (this is the default behavior). + + Note: arvados-cwl-runner currently does not take workflow dependencies + into account when setting the TTL on an intermediate output + collection. If the TTL is too short, it is possible for a collection to + be trashed before downstream steps that consume it are started. The + recommended minimum value for TTL is the expected duration of the + entire the workflow. diff --combined sdk/cwl/arvados_cwl/runner.py index 581c13351d,ad449108f8..d15d617e78 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@@ -100,7 -100,8 +100,8 @@@ def upload_dependencies(arvrunner, name mapper = ArvPathMapper(arvrunner, sc, "", "keep:%s", "keep:%s/%s", - name=name) + name=name, + single_collection=True) def setloc(p): if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")): @@@ -186,7 -187,7 +187,7 @@@ def upload_job_order(arvrunner, name, t return job_order - def upload_workflow_deps(arvrunner, tool): + def upload_workflow_deps(arvrunner, tool, override_tools): # Ensure that Docker images needed by this workflow are available upload_docker(arvrunner, tool) @@@ -203,6 -204,7 +204,7 @@@ False, include_primary=False) document_loader.idx[deptool["id"]] = deptool + override_tools[deptool["id"]] = json.dumps(deptool) tool.visit(upload_tool_deps) @@@ -246,8 -248,7 +248,8 @@@ class Runner(object) def __init__(self, runner, tool, job_order, enable_reuse, output_name, output_tags, submit_runner_ram=0, - name=None, on_error=None, submit_runner_image=None): + name=None, on_error=None, submit_runner_image=None, + intermediate_output_ttl=0): self.arvrunner = runner self.tool = tool self.job_order = job_order @@@ -260,7 -261,6 +262,7 @@@ self.name = name self.on_error = on_error self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__ + self.intermediate_output_ttl = intermediate_output_ttl if submit_runner_ram: self.submit_runner_ram = submit_runner_ram diff --combined sdk/cwl/setup.py index efed1cc72d,02c612fa90..df988f6991 --- a/sdk/cwl/setup.py +++ b/sdk/cwl/setup.py @@@ -52,9 -52,8 +52,9 @@@ setup(name='arvados-cwl-runner' 'schema-salad==2.5.20170428142041', 'typing==3.5.3.0', 'ruamel.yaml==0.13.7', - 'arvados-python-client>=0.1.20170327195441', + 'arvados-python-client>=0.1.20170526013812', - 'setuptools' + 'setuptools', + 'ciso8601' ], data_files=[ ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']), diff --combined sdk/cwl/tests/test_submit.py index 112f432579,47844c1698..2e682830b4 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@@ -34,7 -34,6 +34,6 @@@ def stubs(func) stubs.events = events stubs.keepdocker = keepdocker - def putstub(p, **kwargs): return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p)) keep_client1().put.side_effect = putstub @@@ -53,33 -52,32 +52,32 @@@ "uuid": stubs.fake_user_uuid, } stubs.api.collections().list().execute.return_value = {"items": []} - stubs.api.collections().create().execute.side_effect = ({ - "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1", - "portable_data_hash": "99999999999999999999999999999991+99", - "manifest_text": "" - }, { - "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2", - "portable_data_hash": "99999999999999999999999999999992+99", - "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt" - }, - { - "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4", - "portable_data_hash": "99999999999999999999999999999994+99", - "manifest_text": "" - }, - { - "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz5", - "portable_data_hash": "99999999999999999999999999999995+99", - "manifest_text": "" - }, - { - "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz6", - "portable_data_hash": "99999999999999999999999999999996+99", - "manifest_text": "" - } - ) - stubs.api.collections().get().execute.return_value = { - "portable_data_hash": "99999999999999999999999999999993+99", "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"} + + class CollectionExecute(object): + def __init__(self, exe): + self.exe = exe + def execute(self, num_retries=None): + return self.exe + + def collection_createstub(created_collections, body, ensure_unique_name=None): + mt = body["manifest_text"] + uuid = "zzzzz-4zz18-zzzzzzzzzzzzzz%d" % len(created_collections) + pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt)) + created_collections[uuid] = { + "uuid": uuid, + "portable_data_hash": pdh, + "manifest_text": mt + } + return CollectionExecute(created_collections[uuid]) + + def collection_getstub(created_collections, uuid): + for v in created_collections.itervalues(): + if uuid in (v["uuid"], v["portable_data_hash"]): + return CollectionExecute(v) + + created_collections = {} + stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections) + stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections) stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" stubs.api.jobs().create().execute.return_value = { @@@ -106,7 -104,7 +104,7 @@@ 'script_parameters': { 'x': { 'basename': 'blorp.txt', - 'location': 'keep:99999999999999999999999999999992+99/blorp.txt', + 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt', 'class': 'File' }, 'y': { @@@ -123,8 -121,7 +121,7 @@@ }], 'class': 'Directory' }, - 'cwl:tool': - '99999999999999999999999999999994+99/workflow.cwl#main' + 'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main' }, 'repository': 'arvados', 'script_version': 'master', @@@ -141,12 -138,12 +138,12 @@@ 'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024}, 'script_parameters': { 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}}, - 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999992+99/blorp.txt'}}, + 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'}}, 'z': {"value": {'basename': 'anonymous', 'class': 'Directory', 'listing': [ {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'} ]}}, - 'cwl:tool': '99999999999999999999999999999994+99/workflow.cwl#main', + 'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main', 'arv:enable_reuse': True, 'arv:on_error': 'continue' }, @@@ -191,7 -188,7 +188,7 @@@ 'kind': 'json', 'content': { 'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}, - 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999992+99/blorp.txt'}, + 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'}, 'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [ {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'} ]} @@@ -245,21 -242,24 +242,24 @@@ class TestSubmit(unittest.TestCase) self.assertEqual(exited, 0) stubs.api.collections().create.assert_has_calls([ - mock.call(), mock.call(body=JsonDiffMatcher({ 'manifest_text': '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n', 'replication_desired': None, 'name': 'submit_tool.cwl dependencies', }), ensure_unique_name=True), - mock.call().execute(num_retries=4), mock.call(body=JsonDiffMatcher({ 'manifest_text': '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n', 'replication_desired': None, 'name': 'submit_wf.cwl input', }), ensure_unique_name=True), - mock.call().execute(num_retries=4)]) + mock.call(body=JsonDiffMatcher({ + 'manifest_text': + '. 61df2ed9ee3eb7dd9b799e5ca35305fa+1217 0:1217:workflow.cwl\n', + 'replication_desired': None, + 'name': 'submit_wf.cwl', + }), ensure_unique_name=True) ]) arvdock.assert_has_calls([ mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None), @@@ -428,21 -428,18 +428,18 @@@ logging.exception("") stubs.api.collections().create.assert_has_calls([ - mock.call(), mock.call(body=JsonDiffMatcher({ 'manifest_text': '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n', 'replication_desired': None, 'name': 'submit_tool.cwl dependencies', }), ensure_unique_name=True), - mock.call().execute(num_retries=4), mock.call(body=JsonDiffMatcher({ 'manifest_text': '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n', 'replication_desired': None, 'name': 'submit_wf.cwl input', - }), ensure_unique_name=True), - mock.call().execute(num_retries=4)]) + }), ensure_unique_name=True)]) expect_container = copy.deepcopy(stubs.expect_container_spec) stubs.api.container_requests().create.assert_called_with( @@@ -520,53 -517,6 +517,53 @@@ self.assertEqual(capture_stdout.getvalue(), stubs.expect_container_request_uuid + '\n') + + @stubs + def test_submit_container_output_ttl(self, stubs): + capture_stdout = cStringIO.StringIO() + try: + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", "--intermediate-output-ttl", "3600", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client) + self.assertEqual(exited, 0) + except: + logging.exception("") + + expect_container = copy.deepcopy(stubs.expect_container_spec) + expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', + '--enable-reuse', '--on-error=continue', + "--intermediate-output-ttl=3600", + '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] + + stubs.api.container_requests().create.assert_called_with( + body=JsonDiffMatcher(expect_container)) + self.assertEqual(capture_stdout.getvalue(), + stubs.expect_container_request_uuid + '\n') + + @stubs + def test_submit_container_trash_intermediate(self, stubs): + capture_stdout = cStringIO.StringIO() + try: + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", "--trash-intermediate", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client) + self.assertEqual(exited, 0) + except: + logging.exception("") + + expect_container = copy.deepcopy(stubs.expect_container_spec) + expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', + '--enable-reuse', '--on-error=continue', + "--trash-intermediate", + '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] + + stubs.api.container_requests().create.assert_called_with( + body=JsonDiffMatcher(expect_container)) + self.assertEqual(capture_stdout.getvalue(), + stubs.expect_container_request_uuid + '\n') + @stubs def test_submit_container_output_tags(self, stubs): output_tags = "tag0,tag1,tag2" @@@ -901,7 -851,7 +898,7 @@@ class TestCreateTemplate(unittest.TestC 'dataclass': 'File', 'required': True, 'type': 'File', - 'value': '99999999999999999999999999999992+99/blorp.txt', + 'value': '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt', } expect_component['script_parameters']['y'] = { 'dataclass': 'Collection', @@@ -1153,6 -1103,36 +1150,36 @@@ class TestCreateWorkflow(unittest.TestC self.existing_workflow_uuid + '\n') + @stubs + def test_create_collection_per_tool(self, stubs): + project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz' + + capture_stdout = cStringIO.StringIO() + + exited = arvados_cwl.main( + ["--create-workflow", "--debug", + "--api=containers", + "--project-uuid", project_uuid, + "tests/collection_per_tool/collection_per_tool.cwl"], + capture_stdout, sys.stderr, api_client=stubs.api) + self.assertEqual(exited, 0) + + expect_workflow = open("tests/collection_per_tool/collection_per_tool_packed.cwl").read() + + body = { + "workflow": { + "owner_uuid": project_uuid, + "name": "collection_per_tool.cwl", + "description": "", + "definition": expect_workflow, + } + } + stubs.api.workflows().create.assert_called_with( + body=JsonDiffMatcher(body)) + + self.assertEqual(capture_stdout.getvalue(), + stubs.expect_workflow_uuid + '\n') + class TestTemplateInputs(unittest.TestCase): expect_template = { "components": { @@@ -1163,7 -1143,7 +1190,7 @@@ }, 'script_parameters': { 'cwl:tool': - '99999999999999999999999999999992+99/workflow.cwl#main', + '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main', 'optionalFloatInput': None, 'fileInput': { 'type': 'File', @@@ -1223,8 -1203,8 +1250,8 @@@ expect_template = copy.deepcopy(self.expect_template) params = expect_template[ "components"]["inputs_test.cwl"]["script_parameters"] - params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt' - params["cwl:tool"] = '99999999999999999999999999999994+99/workflow.cwl#main' + params["fileInput"]["value"] = '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt' + params["cwl:tool"] = '6c5ee1cd606088106d9f28367cde1e41+60/workflow.cwl#main' params["floatInput"]["value"] = 1.234 params["boolInput"]["value"] = True