X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1d77dec0f7bc1cca34d52288bd39ecd0a79be250..ba15fa5da21f4bafd3f90a8d259ea2aae764c77e:/sdk/cwl/arvados_cwl/__init__.py?ds=sidebyside diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 25a240ca28..d15acf767f 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 # Implement cwl-runner interface for submitting and running work on Arvados, using # either the Crunch jobs API or Crunch containers API. @@ -74,6 +77,9 @@ class ArvCwlRunner(object): self.output_name = output_name self.output_tags = output_tags self.project_uuid = None + self.intermediate_output_ttl = 0 + self.intermediate_output_collections = [] + self.trash_intermediate = False if keep_client is not None: self.keep_client = keep_client @@ -107,6 +113,7 @@ class ArvCwlRunner(object): fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), num_retries=self.num_retries, overrides=kwargs.get("override_tools")) + kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": @@ -203,6 +210,20 @@ class ArvCwlRunner(object): def add_uploaded(self, src, pair): self.uploaded[src] = pair + def add_intermediate_output(self, uuid): + if uuid: + self.intermediate_output_collections.append(uuid) + + def trash_intermediate_output(self): + logger.info("Cleaning up intermediate output collections") + for i in self.intermediate_output_collections: + try: + self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries) + except: + logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) + if sys.exc_info()[0] is KeyboardInterrupt: + break + def check_features(self, obj): if isinstance(obj, dict): if obj.get("writable"): @@ -216,7 +237,7 @@ class ArvCwlRunner(object): self.check_features(v) elif isinstance(obj, list): for i,v in enumerate(obj): - with SourceLine(obj, i, UnsupportedRequirement): + with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)): self.check_features(v) def make_output_collection(self, name, tagsString, outputObj): @@ -260,7 +281,7 @@ class ArvCwlRunner(object): def rewrite(fileobj): fileobj["location"] = generatemapper.mapper(fileobj["location"]).target - for k in ("basename", "listing", "contents"): + for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"): if k in fileobj: del fileobj[k] @@ -330,6 +351,17 @@ class ArvCwlRunner(object): collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) + + self.trash_intermediate = kwargs["trash_intermediate"] + if self.trash_intermediate and self.work_api != "containers": + raise Exception("--trash-intermediate is only supported with --api=containers.") + + self.intermediate_output_ttl = kwargs["intermediate_output_ttl"] + if self.intermediate_output_ttl and self.work_api != "containers": + raise Exception("--intermediate-output-ttl is only supported with --api=containers.") + if self.intermediate_output_ttl < 0: + raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) + if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) @@ -393,14 +425,8 @@ class ArvCwlRunner(object): if kwargs.get("submit"): # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool": + if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"): kwargs["runnerjob"] = tool.tool["id"] - upload_dependencies(self, - kwargs["name"], - tool.doc_loader, - tool.tool, - tool.tool["id"], - False) runnerjob = tool.job(job_order, self.output_callback, **kwargs).next() @@ -411,7 +437,8 @@ class ArvCwlRunner(object): submit_runner_ram=kwargs.get("submit_runner_ram"), name=kwargs.get("name"), on_error=kwargs.get("on_error"), - submit_runner_image=kwargs.get("submit_runner_image")) + submit_runner_image=kwargs.get("submit_runner_image"), + intermediate_output_ttl=kwargs.get("intermediate_output_ttl")) elif self.work_api == "jobs": runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, @@ -420,8 +447,7 @@ class ArvCwlRunner(object): name=kwargs.get("name"), on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image")) - - if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs": + elif "cwl_runner_job" not in kwargs and self.work_api == "jobs": # Create pipeline for local run self.pipeline = self.api.pipeline_instances().create( body={ @@ -515,6 +541,9 @@ class ArvCwlRunner(object): adjustDirObjs(self.final_output, partial(get_listing, self.fs_access)) adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) + if self.trash_intermediate and self.final_status == "success": + self.trash_intermediate_output() + return (self.final_output, self.final_status) @@ -561,10 +590,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", default=True, dest="enable_reuse", - help="") + help="Enable job or container reuse (default)") exgroup.add_argument("--disable-reuse", action="store_false", default=True, dest="enable_reuse", - help="") + help="Disable job or container reuse") parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.") parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None) @@ -597,7 +626,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--api", type=str, default=None, dest="work_api", - help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.") + choices=("jobs", "containers"), + help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.") parser.add_argument("--compute-checksum", action="store_true", default=False, help="Compute checksum of contents while collecting outputs", @@ -623,6 +653,18 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Enable loading and running development versions " "of CWL spec.", default=False) + parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", + help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).", + default=0) + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--trash-intermediate", action="store_true", + default=False, dest="trash_intermediate", + help="Immediately trash intermediate outputs on workflow success.") + exgroup.add_argument("--no-trash-intermediate", action="store_false", + default=False, dest="trash_intermediate", + help="Do not trash intermediate outputs (default).") + parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") @@ -640,7 +682,9 @@ def add_arv_hints(): "http://arvados.org/cwl#RuntimeConstraints", "http://arvados.org/cwl#PartitionRequirement", "http://arvados.org/cwl#APIRequirement", - "http://commonwl.org/cwltool#LoadListingRequirement" + "http://commonwl.org/cwltool#LoadListingRequirement", + "http://arvados.org/cwl#IntermediateOutput", + "http://arvados.org/cwl#ReuseRequirement" ]) def main(args, stdout, stderr, api_client=None, keep_client=None): @@ -706,7 +750,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): arvargs.conformance_test = None arvargs.use_container = True arvargs.relax_path_checks = True - arvargs.validate = None + arvargs.print_supported_versions = False make_fs_access = partial(CollectionFsAccess, collection_cache=runner.collection_cache)