Merge branch '12195-nodemanager-quota-error'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index be1ec27820fd9fd8aa33789e1095983f3b8b898c..7f4b5c7549314b0d0dbd3cfbf52b1023ad7887fd 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
 
 # Implement cwl-runner interface for submitting and running work on Arvados, using
 # either the Crunch jobs API or Crunch containers API.
@@ -108,7 +111,9 @@ class ArvCwlRunner(object):
         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
                                                 api_client=self.api,
                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
-                                                num_retries=self.num_retries)
+                                                num_retries=self.num_retries,
+                                                overrides=kwargs.get("override_tools"))
+        kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
@@ -346,17 +351,24 @@ class ArvCwlRunner(object):
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
-        self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+
         self.trash_intermediate = kwargs["trash_intermediate"]
+        if self.trash_intermediate and self.work_api != "containers":
+            raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+        self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
         if self.intermediate_output_ttl and self.work_api != "containers":
-            raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
+            raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+        if self.intermediate_output_ttl < 0:
+            raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
 
         if not kwargs.get("name"):
             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        upload_workflow_deps(self, tool)
+        override_tools = {}
+        upload_workflow_deps(self, tool, override_tools)
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
@@ -364,7 +376,8 @@ class ArvCwlRunner(object):
                                   makeTool=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
-                                  metadata=tool.metadata)
+                                  metadata=tool.metadata,
+                                  override_tools=override_tools)
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % kwargs["name"],
@@ -412,14 +425,8 @@ class ArvCwlRunner(object):
         if kwargs.get("submit"):
             # Submit a runner job to run the workflow for us.
             if self.work_api == "containers":
-                if tool.tool["class"] == "CommandLineTool":
+                if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
                     kwargs["runnerjob"] = tool.tool["id"]
-                    upload_dependencies(self,
-                                        kwargs["name"],
-                                        tool.doc_loader,
-                                        tool.tool,
-                                        tool.tool["id"],
-                                        False)
                     runnerjob = tool.job(job_order,
                                          self.output_callback,
                                          **kwargs).next()
@@ -440,8 +447,7 @@ class ArvCwlRunner(object):
                                       name=kwargs.get("name"),
                                       on_error=kwargs.get("on_error"),
                                       submit_runner_image=kwargs.get("submit_runner_image"))
-
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+        elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
@@ -677,7 +683,8 @@ def add_arv_hints():
         "http://arvados.org/cwl#PartitionRequirement",
         "http://arvados.org/cwl#APIRequirement",
         "http://commonwl.org/cwltool#LoadListingRequirement",
-        "http://arvados.org/cwl#IntermediateOutput"
+        "http://arvados.org/cwl#IntermediateOutput",
+        "http://arvados.org/cwl#ReuseRequirement"
     ])
 
 def main(args, stdout, stderr, api_client=None, keep_client=None):
@@ -744,6 +751,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     arvargs.use_container = True
     arvargs.relax_path_checks = True
     arvargs.validate = None
+    arvargs.print_supported_versions = False
 
     make_fs_access = partial(CollectionFsAccess,
                            collection_cache=runner.collection_cache)