X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5dbc1ae3d451f904654a2a61e5df620808ac175d..d7d074d790366338a01736552e916c5e4b5cef69:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 100096ab17..27774b2f7c 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -559,7 +559,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods uuid=existing_uuid, submit_runner_ram=runtimeContext.submit_runner_ram, name=runtimeContext.name, - merged_map=merged_map) + merged_map=merged_map, + loadingContext=loadingContext) tmpl.save() # cwltool.main will write our return value to stdout. return (tmpl.uuid, "success") @@ -618,11 +619,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if self.work_api == "containers": if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner): runtimeContext.runnerjob = tool.tool["id"] - runnerjob = tool.job(job_order, - self.output_callback, - runtimeContext).next() else: - runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse, + tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse, self.output_name, self.output_tags, submit_runner_ram=runtimeContext.submit_runner_ram, @@ -636,7 +634,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods collection_cache_size=runtimeContext.collection_cache_size, collection_cache_is_default=self.should_estimate_cache_size) elif self.work_api == "jobs": - runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse, + tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse, self.output_name, self.output_tags, submit_runner_ram=runtimeContext.submit_runner_ram, @@ -654,10 +652,16 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods "state": "RunningOnClient"}).execute(num_retries=self.num_retries) logger.info("Pipeline instance %s", self.pipeline["uuid"]) - if runnerjob and not runtimeContext.wait: - submitargs = runtimeContext.copy() - submitargs.submit = False - runnerjob.run(submitargs) + if runtimeContext.cwl_runner_job is not None: + self.uuid = runtimeContext.cwl_runner_job.get('uuid') + + jobiter = tool.job(job_order, + self.output_callback, + runtimeContext) + + if runtimeContext.submit and not runtimeContext.wait: + runnerjob = jobiter.next() + runnerjob.run(runtimeContext) return (runnerjob.uuid, "success") current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) @@ -672,14 +676,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods try: self.workflow_eval_lock.acquire() - if runnerjob: - jobiter = iter((runnerjob,)) - else: - if runtimeContext.cwl_runner_job is not None: - self.uuid = runtimeContext.cwl_runner_job.get('uuid') - jobiter = tool.job(job_order, - self.output_callback, - runtimeContext) # Holds the lock while this code runs and releases it when # it is safe to do so in self.workflow_eval_lock.wait(), @@ -728,8 +724,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if self.pipeline: self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], body={"state": "Failed"}).execute(num_retries=self.num_retries) - if runnerjob and runnerjob.uuid and self.work_api == "containers": - self.api.container_requests().update(uuid=runnerjob.uuid, + if runtimeContext.submit and isinstance(tool, Runner): + runnerjob = tool + if runnerjob.uuid and self.work_api == "containers": + self.api.container_requests().update(uuid=runnerjob.uuid, body={"priority": "0"}).execute(num_retries=self.num_retries) finally: self.workflow_eval_lock.release() @@ -744,8 +742,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods if self.final_output is None: raise WorkflowException("Workflow did not return a result.") - if runtimeContext.submit and isinstance(runnerjob, Runner): - logger.info("Final output collection %s", runnerjob.final_output) + if runtimeContext.submit and isinstance(tool, Runner): + logger.info("Final output collection %s", tool.final_output) else: if self.output_name is None: self.output_name = "Output of %s" % (shortname(tool.tool["id"]))