From 6272cb13e54254ad5ab3da898716daaab3ddae6a Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 7 Feb 2020 15:59:10 -0500 Subject: [PATCH] 16139: Fix secondaryFile errors when running --submit --no-wait When submitting, we want to preserve the original CWL version of the document. However, when it creates a RunnerContainer (a cwltool.Process) it examines the input interface and expects it to be in the 1.1 data model. But if we preserve/reload the original document, it is still in the 1.0 data model. This causes The code was deliberately overriding the CWL version in metadata to make this work. This results in the problem reported on this ticket. The fix is to maintain both the updated and preserved documents, and use them appropriately where they are expected. refs #16139 Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/executor.py | 52 +++++++++++++++++++-------------- sdk/cwl/arvados_cwl/runner.py | 8 ++--- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 406ebfd2da..99d4c4e9a1 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -521,10 +521,10 @@ The 'jobs' API is no longer supported. for req in job_reqs: tool.requirements.append(req) - def arv_executor(self, tool, job_order, runtimeContext, logger=None): + def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None): self.debug = runtimeContext.debug - tool.visit(self.check_features) + updated_tool.visit(self.check_features) self.project_uuid = runtimeContext.project_uuid self.pipeline = None @@ -545,16 +545,20 @@ The 'jobs' API is no longer supported. raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) if not runtimeContext.name: - runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) + runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, - tool, job_order) + updated_tool, job_order) + + # the last clause means: if it is a command line tool, and we + # are going to wait for the result, and always_submit_runner + # is false, then we don't submit a runner process. submitting = (runtimeContext.update_workflow or runtimeContext.create_workflow or (runtimeContext.submit and not - (tool.tool["class"] == "CommandLineTool" and + (updated_tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and not runtimeContext.always_submit_runner))) @@ -564,8 +568,11 @@ The 'jobs' API is no longer supported. if submitting: # Document may have been auto-updated. Reload the original # document with updating disabled because we want to - # submit the original document, not the auto-updated one. - tool = load_tool(tool.tool["id"], loadingContext) + # submit the document with its original CWL version, not + # the auto-updated one. + tool = load_tool(updated_tool.tool["id"], loadingContext) + else: + tool = updated_tool # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. @@ -632,22 +639,23 @@ The 'jobs' API is no longer supported. if runtimeContext.submit: # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner): - runtimeContext.runnerjob = tool.tool["id"] + if submitting: + tool = RunnerContainer(self, updated_tool, + tool, loadingContext, runtimeContext.enable_reuse, + self.output_name, + self.output_tags, + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + on_error=runtimeContext.on_error, + submit_runner_image=runtimeContext.submit_runner_image, + intermediate_output_ttl=runtimeContext.intermediate_output_ttl, + merged_map=merged_map, + priority=runtimeContext.priority, + secret_store=self.secret_store, + collection_cache_size=runtimeContext.collection_cache_size, + collection_cache_is_default=self.should_estimate_cache_size) else: - tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse, - self.output_name, - self.output_tags, - submit_runner_ram=runtimeContext.submit_runner_ram, - name=runtimeContext.name, - on_error=runtimeContext.on_error, - submit_runner_image=runtimeContext.submit_runner_image, - intermediate_output_ttl=runtimeContext.intermediate_output_ttl, - merged_map=merged_map, - priority=runtimeContext.priority, - secret_store=self.secret_store, - collection_cache_size=runtimeContext.collection_cache_size, - collection_cache_is_default=self.should_estimate_cache_size) + runtimeContext.runnerjob = tool.tool["id"] if runtimeContext.cwl_runner_job is not None: self.uuid = runtimeContext.cwl_runner_job.get('uuid') diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 19a6dd98b3..2239e0f9df 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -578,7 +578,8 @@ class Runner(Process): """Base class for runner processes, which submit an instance of arvados-cwl-runner and wait for the final result.""" - def __init__(self, runner, tool, loadingContext, enable_reuse, + def __init__(self, runner, updated_tool, + tool, loadingContext, enable_reuse, output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, intermediate_output_ttl=0, merged_map=None, @@ -587,10 +588,9 @@ class Runner(Process): collection_cache_is_default=True): loadingContext = loadingContext.copy() - loadingContext.metadata = loadingContext.metadata.copy() - loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION + loadingContext.metadata = updated_tool.metadata.copy() - super(Runner, self).__init__(tool.tool, loadingContext) + super(Runner, self).__init__(updated_tool.tool, loadingContext) self.arvrunner = runner self.embedded_tool = tool -- 2.30.2