X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a08e3bb86caa758df7d33a3df3f6b8c333e47838..138fef8ee97f3cbd335434ad6acd26771fd0b762:/sdk/cwl/arvados_cwl/arvjob.py diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 25f64ea230..e222152a16 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -10,7 +10,7 @@ import time from cwltool.process import get_feature, shortname, UnsupportedRequirement from cwltool.errors import WorkflowException -from cwltool.draft2tool import revmap_file, CommandLineTool +from cwltool.command_line_tool import revmap_file, CommandLineTool from cwltool.load_tool import fetch_document from cwltool.builder import Builder from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class @@ -134,6 +134,8 @@ class ArvadosJob(object): if reuse_req: enable_reuse = reuse_req["enableReuse"] + self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) + try: with Perf(metrics, "create %s" % self.name): response = self.arvrunner.api.jobs().create( @@ -150,7 +152,8 @@ class ArvadosJob(object): find_or_create=enable_reuse ).execute(num_retries=self.arvrunner.num_retries) - self.arvrunner.processes[response["uuid"]] = self + self.uuid = response["uuid"] + self.arvrunner.process_submitted(self) self.update_pipeline_component(response) @@ -263,8 +266,8 @@ class ArvadosJob(object): processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + self.arvrunner.process_done(record["uuid"]) + class RunnerJob(Runner): """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" @@ -280,7 +283,7 @@ class RunnerJob(Runner): if self.tool.tool["id"].startswith("keep:"): self.job_order["cwl:tool"] = self.tool.tool["id"][5:] else: - packed = packed_workflow(self.arvrunner, self.tool) + packed = packed_workflow(self.arvrunner, self.tool, self.merged_map) wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed) self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh @@ -314,8 +317,8 @@ class RunnerJob(Runner): } } - def run(self, *args, **kwargs): - job_spec = self.arvados_job_spec(*args, **kwargs) + def run(self, **kwargs): + job_spec = self.arvados_job_spec(**kwargs) job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) @@ -351,7 +354,7 @@ class RunnerJob(Runner): return self.uuid = job["uuid"] - self.arvrunner.processes[self.uuid] = self + self.arvrunner.process_submitted(self) if job["state"] in ("Complete", "Failed", "Cancelled"): self.done(job) @@ -370,7 +373,7 @@ class RunnerTemplate(object): } def __init__(self, runner, tool, job_order, enable_reuse, uuid, - submit_runner_ram=0, name=None): + submit_runner_ram=0, name=None, merged_map=None): self.runner = runner self.tool = tool self.job = RunnerJob( @@ -381,7 +384,8 @@ class RunnerTemplate(object): output_name=None, output_tags=None, submit_runner_ram=submit_runner_ram, - name=name) + name=name, + merged_map=merged_map) self.uuid = uuid def pipeline_component_spec(self):