X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5cd99e52e342db19c1a77369a5a8c027ee04feaf..80459d52161120ae8e33da140984d596271d5195:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index d006629b9b..64ec4e2ef2 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -32,7 +32,7 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow -from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver +from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf from .pathmapper import NoFollowPathMapper from ._version import __version__ @@ -74,12 +74,17 @@ class ArvCwlRunner(object): self.output_name = output_name self.output_tags = output_tags self.project_uuid = None + self.intermediate_output_ttl = 0 + self.intermediate_output_collections = [] + self.trash_intermediate = False if keep_client is not None: self.keep_client = keep_client else: self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries) + self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.work_api = None expected_api = ["jobs", "containers"] for api in expected_api: @@ -102,7 +107,9 @@ class ArvCwlRunner(object): kwargs["work_api"] = self.work_api kwargs["fetcher_constructor"] = partial(CollectionFetcher, api_client=self.api, - keep_client=self.keep_client) + fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), + num_retries=self.num_retries, + overrides=kwargs.get("override_tools")) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": @@ -199,16 +206,24 @@ class ArvCwlRunner(object): def add_uploaded(self, src, pair): self.uploaded[src] = pair + def add_intermediate_output(self, uuid): + if uuid: + self.intermediate_output_collections.append(uuid) + + def trash_intermediate_output(self): + logger.info("Cleaning up intermediate output collections") + for i in self.intermediate_output_collections: + try: + self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries) + except: + logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False)) + if sys.exc_info()[0] is KeyboardInterrupt: + break + def check_features(self, obj): if isinstance(obj, dict): if obj.get("writable"): raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported") - if obj.get("class") == "CommandLineTool": - if self.work_api == "containers": - if obj.get("stdin"): - raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers") - if obj.get("stderr"): - raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers") if obj.get("class") == "DockerRequirement": if obj.get("dockerOutputDirectory"): # TODO: can be supported by containers API, but not jobs API. @@ -237,7 +252,6 @@ class ArvCwlRunner(object): keep_client=self.keep_client, num_retries=self.num_retries) - srccollections = {} for k,v in generatemapper.items(): if k.startswith("_:"): if v.type == "Directory": @@ -251,20 +265,13 @@ class ArvCwlRunner(object): raise Exception("Output source is not in keep or a literal") sp = k.split("/") srccollection = sp[0][5:] - if srccollection not in srccollections: - try: - srccollections[srccollection] = arvados.collection.CollectionReader( - srccollection, - api_client=self.api, - keep_client=self.keep_client, - num_retries=self.num_retries) - except arvados.errors.ArgumentError as e: - logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) - raise - reader = srccollections[srccollection] try: + reader = self.collection_cache.get(srccollection) srcpath = "/".join(sp[1:]) if len(sp) > 1 else "." final.copy(srcpath, v.target, source_collection=reader, overwrite=False) + except arvados.errors.ArgumentError as e: + logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) + raise except IOError as e: logger.warn("While preparing output collection: %s", e) @@ -337,16 +344,27 @@ class ArvCwlRunner(object): self.project_uuid = kwargs.get("project_uuid") self.pipeline = None make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, - api_client=self.api, - keep_client=self.keep_client) + collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) + + self.trash_intermediate = kwargs["trash_intermediate"] + if self.trash_intermediate and self.work_api != "containers": + raise Exception("--trash-intermediate is only supported with --api=containers.") + + self.intermediate_output_ttl = kwargs["intermediate_output_ttl"] + if self.intermediate_output_ttl and self.work_api != "containers": + raise Exception("--intermediate-output-ttl is only supported with --api=containers.") + if self.intermediate_output_ttl < 0: + raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) + if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - upload_workflow_deps(self, tool) + override_tools = {} + upload_workflow_deps(self, tool, override_tools) # Reload tool object which may have been updated by # upload_workflow_deps @@ -354,7 +372,8 @@ class ArvCwlRunner(object): makeTool=self.arv_make_tool, loader=tool.doc_loader, avsc_names=tool.doc_schema, - metadata=tool.metadata) + metadata=tool.metadata, + override_tools=override_tools) # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % kwargs["name"], @@ -420,7 +439,8 @@ class ArvCwlRunner(object): submit_runner_ram=kwargs.get("submit_runner_ram"), name=kwargs.get("name"), on_error=kwargs.get("on_error"), - submit_runner_image=kwargs.get("submit_runner_image")) + submit_runner_image=kwargs.get("submit_runner_image"), + intermediate_output_ttl=kwargs.get("intermediate_output_ttl")) elif self.work_api == "jobs": runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, @@ -429,8 +449,7 @@ class ArvCwlRunner(object): name=kwargs.get("name"), on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image")) - - if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs": + elif "cwl_runner_job" not in kwargs and self.work_api == "jobs": # Create pipeline for local run self.pipeline = self.api.pipeline_instances().create( body={ @@ -524,6 +543,9 @@ class ArvCwlRunner(object): adjustDirObjs(self.final_output, partial(get_listing, self.fs_access)) adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) + if self.trash_intermediate and self.final_status == "success": + self.trash_intermediate_output() + return (self.final_output, self.final_status) @@ -551,7 +573,12 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.", type=float, default=20) - parser.add_argument("--version", action="store_true", help="Print version and exit") + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--print-dot", action="store_true", + help="Print workflow visualization in graphviz format and exit") + exgroup.add_argument("--version", action="store_true", help="Print version and exit") + exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--verbose", action="store_true", help="Default logging") @@ -565,10 +592,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--enable-reuse", action="store_true", default=True, dest="enable_reuse", - help="") + help="Enable job or container reuse (default)") exgroup.add_argument("--disable-reuse", action="store_false", default=True, dest="enable_reuse", - help="") + help="Disable job or container reuse") parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.") parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None) @@ -601,7 +628,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser parser.add_argument("--api", type=str, default=None, dest="work_api", - help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.") + choices=("jobs", "containers"), + help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.") parser.add_argument("--compute-checksum", action="store_true", default=False, help="Compute checksum of contents while collecting outputs", @@ -623,6 +651,22 @@ def arg_parser(): # type: () -> argparse.ArgumentParser help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. " "Default is 'continue'.", default="continue", choices=("stop", "continue")) + parser.add_argument("--enable-dev", action="store_true", + help="Enable loading and running development versions " + "of CWL spec.", default=False) + + parser.add_argument("--intermediate-output-ttl", type=int, metavar="N", + help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).", + default=0) + + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--trash-intermediate", action="store_true", + default=False, dest="trash_intermediate", + help="Immediately trash intermediate outputs on workflow success.") + exgroup.add_argument("--no-trash-intermediate", action="store_false", + default=False, dest="trash_intermediate", + help="Do not trash intermediate outputs (default).") + parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute") parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.") @@ -630,6 +674,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser def add_arv_hints(): cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") + cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml') use_custom_schema("v1.0", "http://arvados.org/cwl", res.read()) res.close() @@ -639,10 +684,11 @@ def add_arv_hints(): "http://arvados.org/cwl#RuntimeConstraints", "http://arvados.org/cwl#PartitionRequirement", "http://arvados.org/cwl#APIRequirement", - "http://commonwl.org/cwltool#LoadListingRequirement" + "http://commonwl.org/cwltool#LoadListingRequirement", + "http://arvados.org/cwl#IntermediateOutput", + "http://arvados.org/cwl#ReuseRequirement" ]) - def main(args, stdout, stderr, api_client=None, keep_client=None): parser = arg_parser() @@ -708,6 +754,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): arvargs.relax_path_checks = True arvargs.validate = None + make_fs_access = partial(CollectionFsAccess, + collection_cache=runner.collection_cache) + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, @@ -715,12 +764,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): makeTool=runner.arv_make_tool, versionfunc=versionstring, job_order_object=job_order_object, - make_fs_access=partial(CollectionFsAccess, - api_client=api_client, - keep_client=keep_client), + make_fs_access=make_fs_access, fetcher_constructor=partial(CollectionFetcher, api_client=api_client, - keep_client=keep_client, + fs_access=make_fs_access(""), num_retries=runner.num_retries), resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries), logger_handler=arvados.log_handler,