#!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
# Implement cwl-runner interface for submitting and running work on Arvados, using
# either the Crunch jobs API or Crunch containers API.
kwargs["fetcher_constructor"] = partial(CollectionFetcher,
api_client=self.api,
fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ overrides=kwargs.get("override_tools"))
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
- self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+
self.trash_intermediate = kwargs["trash_intermediate"]
+ if self.trash_intermediate and self.work_api != "containers":
+ raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+ self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
if self.intermediate_output_ttl and self.work_api != "containers":
- raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
+ raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+ if self.intermediate_output_ttl < 0:
+ raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- upload_workflow_deps(self, tool)
+ override_tools = {}
+ upload_workflow_deps(self, tool, override_tools)
# Reload tool object which may have been updated by
# upload_workflow_deps
makeTool=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
- metadata=tool.metadata)
+ metadata=tool.metadata,
+ override_tools=override_tools)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % kwargs["name"],
if kwargs.get("submit"):
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool":
+ if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
kwargs["runnerjob"] = tool.tool["id"]
upload_dependencies(self,
kwargs["name"],
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
submit_runner_image=kwargs.get("submit_runner_image"))
-
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+ elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
"http://arvados.org/cwl#PartitionRequirement",
"http://arvados.org/cwl#APIRequirement",
"http://commonwl.org/cwltool#LoadListingRequirement",
- "http://arvados.org/cwl#IntermediateOutput"
+ "http://arvados.org/cwl#IntermediateOutput",
+ "http://arvados.org/cwl#ReuseRequirement"
])
def main(args, stdout, stderr, api_client=None, keep_client=None):