X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1aa2903df6165ecc8164c40957f282d5f7174b21..48c38895200cdafaaeca37299bf8352878389a77:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 406ebfd2da..99d4c4e9a1 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -521,10 +521,10 @@ The 'jobs' API is no longer supported. for req in job_reqs: tool.requirements.append(req) - def arv_executor(self, tool, job_order, runtimeContext, logger=None): + def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None): self.debug = runtimeContext.debug - tool.visit(self.check_features) + updated_tool.visit(self.check_features) self.project_uuid = runtimeContext.project_uuid self.pipeline = None @@ -545,16 +545,20 @@ The 'jobs' API is no longer supported. raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) if not runtimeContext.name: - runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"]) + runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, - tool, job_order) + updated_tool, job_order) + + # the last clause means: if it is a command line tool, and we + # are going to wait for the result, and always_submit_runner + # is false, then we don't submit a runner process. submitting = (runtimeContext.update_workflow or runtimeContext.create_workflow or (runtimeContext.submit and not - (tool.tool["class"] == "CommandLineTool" and + (updated_tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and not runtimeContext.always_submit_runner))) @@ -564,8 +568,11 @@ The 'jobs' API is no longer supported. if submitting: # Document may have been auto-updated. Reload the original # document with updating disabled because we want to - # submit the original document, not the auto-updated one. - tool = load_tool(tool.tool["id"], loadingContext) + # submit the document with its original CWL version, not + # the auto-updated one. + tool = load_tool(updated_tool.tool["id"], loadingContext) + else: + tool = updated_tool # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. @@ -632,22 +639,23 @@ The 'jobs' API is no longer supported. if runtimeContext.submit: # Submit a runner job to run the workflow for us. if self.work_api == "containers": - if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner): - runtimeContext.runnerjob = tool.tool["id"] + if submitting: + tool = RunnerContainer(self, updated_tool, + tool, loadingContext, runtimeContext.enable_reuse, + self.output_name, + self.output_tags, + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + on_error=runtimeContext.on_error, + submit_runner_image=runtimeContext.submit_runner_image, + intermediate_output_ttl=runtimeContext.intermediate_output_ttl, + merged_map=merged_map, + priority=runtimeContext.priority, + secret_store=self.secret_store, + collection_cache_size=runtimeContext.collection_cache_size, + collection_cache_is_default=self.should_estimate_cache_size) else: - tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse, - self.output_name, - self.output_tags, - submit_runner_ram=runtimeContext.submit_runner_ram, - name=runtimeContext.name, - on_error=runtimeContext.on_error, - submit_runner_image=runtimeContext.submit_runner_image, - intermediate_output_ttl=runtimeContext.intermediate_output_ttl, - merged_map=merged_map, - priority=runtimeContext.priority, - secret_store=self.secret_store, - collection_cache_size=runtimeContext.collection_cache_size, - collection_cache_is_default=self.should_estimate_cache_size) + runtimeContext.runnerjob = tool.tool["id"] if runtimeContext.cwl_runner_job is not None: self.uuid = runtimeContext.cwl_runner_job.get('uuid')