16139: Fix secondaryFile errors when running --submit --no-wait
authorPeter Amstutz <peter.amstutz@curii.com>
Fri, 7 Feb 2020 20:59:10 +0000 (15:59 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Fri, 7 Feb 2020 21:11:40 +0000 (16:11 -0500)
When submitting, we want to preserve the original CWL version of the
document.

However, when it creates a RunnerContainer (a cwltool.Process) it
examines the input interface and expects it to be in the 1.1 data
model.  But if we preserve/reload the original document, it is still
in the 1.0 data model.  This causes

The code was deliberately overriding the CWL version in metadata to
make this work.  This results in the problem reported on this ticket.

The fix is to maintain both the updated and preserved documents, and
use them appropriately where they are expected.

refs #16139

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/runner.py

index 406ebfd2da064df383105b8e0a7c8f4e7b19a529..99d4c4e9a10a883abc54ce0fe1cbc476af7c7692 100644 (file)
@@ -521,10 +521,10 @@ The 'jobs' API is no longer supported.
             for req in job_reqs:
                 tool.requirements.append(req)
 
-    def arv_executor(self, tool, job_order, runtimeContext, logger=None):
+    def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
         self.debug = runtimeContext.debug
 
-        tool.visit(self.check_features)
+        updated_tool.visit(self.check_features)
 
         self.project_uuid = runtimeContext.project_uuid
         self.pipeline = None
@@ -545,16 +545,20 @@ The 'jobs' API is no longer supported.
             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
 
         if not runtimeContext.name:
-            runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+            runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
-                                     tool, job_order)
+                                     updated_tool, job_order)
+
+        # the last clause means: if it is a command line tool, and we
+        # are going to wait for the result, and always_submit_runner
+        # is false, then we don't submit a runner process.
 
         submitting = (runtimeContext.update_workflow or
                       runtimeContext.create_workflow or
                       (runtimeContext.submit and not
-                       (tool.tool["class"] == "CommandLineTool" and
+                       (updated_tool.tool["class"] == "CommandLineTool" and
                         runtimeContext.wait and
                         not runtimeContext.always_submit_runner)))
 
@@ -564,8 +568,11 @@ The 'jobs' API is no longer supported.
         if submitting:
             # Document may have been auto-updated. Reload the original
             # document with updating disabled because we want to
-            # submit the original document, not the auto-updated one.
-            tool = load_tool(tool.tool["id"], loadingContext)
+            # submit the document with its original CWL version, not
+            # the auto-updated one.
+            tool = load_tool(updated_tool.tool["id"], loadingContext)
+        else:
+            tool = updated_tool
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
@@ -632,22 +639,23 @@ The 'jobs' API is no longer supported.
         if runtimeContext.submit:
             # Submit a runner job to run the workflow for us.
             if self.work_api == "containers":
-                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
-                    runtimeContext.runnerjob = tool.tool["id"]
+                if submitting:
+                    tool = RunnerContainer(self, updated_tool,
+                                           tool, loadingContext, runtimeContext.enable_reuse,
+                                           self.output_name,
+                                           self.output_tags,
+                                           submit_runner_ram=runtimeContext.submit_runner_ram,
+                                           name=runtimeContext.name,
+                                           on_error=runtimeContext.on_error,
+                                           submit_runner_image=runtimeContext.submit_runner_image,
+                                           intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
+                                           merged_map=merged_map,
+                                           priority=runtimeContext.priority,
+                                           secret_store=self.secret_store,
+                                           collection_cache_size=runtimeContext.collection_cache_size,
+                                           collection_cache_is_default=self.should_estimate_cache_size)
                 else:
-                    tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
-                                                self.output_name,
-                                                self.output_tags,
-                                                submit_runner_ram=runtimeContext.submit_runner_ram,
-                                                name=runtimeContext.name,
-                                                on_error=runtimeContext.on_error,
-                                                submit_runner_image=runtimeContext.submit_runner_image,
-                                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
-                                                merged_map=merged_map,
-                                                priority=runtimeContext.priority,
-                                                secret_store=self.secret_store,
-                                                collection_cache_size=runtimeContext.collection_cache_size,
-                                                collection_cache_is_default=self.should_estimate_cache_size)
+                    runtimeContext.runnerjob = tool.tool["id"]
 
         if runtimeContext.cwl_runner_job is not None:
             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
index 19a6dd98b332c6dbc8363e989104a075cf90f587..2239e0f9df952b9ab75a7e9a96ab46953ab29f94 100644 (file)
@@ -578,7 +578,8 @@ class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, tool, loadingContext, enable_reuse,
+    def __init__(self, runner, updated_tool,
+                 tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
@@ -587,10 +588,9 @@ class Runner(Process):
                  collection_cache_is_default=True):
 
         loadingContext = loadingContext.copy()
-        loadingContext.metadata = loadingContext.metadata.copy()
-        loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
+        loadingContext.metadata = updated_tool.metadata.copy()
 
-        super(Runner, self).__init__(tool.tool, loadingContext)
+        super(Runner, self).__init__(updated_tool.tool, loadingContext)
 
         self.arvrunner = runner
         self.embedded_tool = tool