X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bee4fa82c15b2188c2105fff3b52c305b38f04a6..6f6ce90eb894bde190fd7522cbec037fe61fc25c:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index be1ec27820..7f4b5c7549 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 # Implement cwl-runner interface for submitting and running work on Arvados, using # either the Crunch jobs API or Crunch containers API. @@ -108,7 +111,9 @@ class ArvCwlRunner(object): kwargs["fetcher_constructor"] = partial(CollectionFetcher, api_client=self.api, fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), - num_retries=self.num_retries) + num_retries=self.num_retries, + overrides=kwargs.get("override_tools")) + kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": @@ -346,17 +351,24 @@ class ArvCwlRunner(object): collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) - self.intermediate_output_ttl = kwargs["intermediate_output_ttl"] + self.trash_intermediate = kwargs["trash_intermediate"] + if self.trash_intermediate and self.work_api != "containers": + raise Exception("--trash-intermediate is only supported with --api=containers.") + + self.intermediate_output_ttl = kwargs["intermediate_output_ttl"] if self.intermediate_output_ttl and self.work_api != "containers": - raise Exception("--intermediate-output-ttl is only supported when using the containers api.") + raise Exception("--intermediate-output-ttl is only supported with --api=containers.") + if self.intermediate_output_ttl < 0: + raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl) if not kwargs.get("name"): kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - upload_workflow_deps(self, tool) + override_tools = {} + upload_workflow_deps(self, tool, override_tools) # Reload tool object which may have been updated by # upload_workflow_deps @@ -364,7 +376,8 @@ class ArvCwlRunner(object): makeTool=self.arv_make_tool, loader=tool.doc_loader, avsc_names=tool.doc_schema, - metadata=tool.metadata) + metadata=tool.metadata, + override_tools=override_tools) # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % kwargs["name"], @@ -412,14 +425,8 @@ class ArvCwlRunner(object): if kwargs.get("submit"): # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool": + if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"): kwargs["runnerjob"] = tool.tool["id"] - upload_dependencies(self, - kwargs["name"], - tool.doc_loader, - tool.tool, - tool.tool["id"], - False) runnerjob = tool.job(job_order, self.output_callback, **kwargs).next() @@ -440,8 +447,7 @@ class ArvCwlRunner(object): name=kwargs.get("name"), on_error=kwargs.get("on_error"), submit_runner_image=kwargs.get("submit_runner_image")) - - if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs": + elif "cwl_runner_job" not in kwargs and self.work_api == "jobs": # Create pipeline for local run self.pipeline = self.api.pipeline_instances().create( body={ @@ -677,7 +683,8 @@ def add_arv_hints(): "http://arvados.org/cwl#PartitionRequirement", "http://arvados.org/cwl#APIRequirement", "http://commonwl.org/cwltool#LoadListingRequirement", - "http://arvados.org/cwl#IntermediateOutput" + "http://arvados.org/cwl#IntermediateOutput", + "http://arvados.org/cwl#ReuseRequirement" ]) def main(args, stdout, stderr, api_client=None, keep_client=None): @@ -744,6 +751,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): arvargs.use_container = True arvargs.relax_path_checks = True arvargs.validate = None + arvargs.print_supported_versions = False make_fs_access = partial(CollectionFsAccess, collection_cache=runner.collection_cache)