X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f7ef4902ede928417e91efc403c0db3efe6ce81d..4a78e8e91fdad38e567fef0cd43aa8cb6bd33580:/sdk/cwl/arvados_cwl/arvjob.py diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 737c9580fb..ea599ea837 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -6,11 +6,12 @@ import logging import re import copy import json +import datetime import time from cwltool.process import get_feature, shortname, UnsupportedRequirement from cwltool.errors import WorkflowException -from cwltool.draft2tool import revmap_file, CommandLineTool +from cwltool.command_line_tool import revmap_file, CommandLineTool from cwltool.load_tool import fetch_document from cwltool.builder import Builder from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class @@ -67,7 +68,11 @@ class ArvadosJob(object): if vwd: with Perf(metrics, "generatefiles.save_new %s" % self.name): - vwd.save_new() + info = self._get_intermediate_collection_info() + vwd.save_new(name=info["name"], + ensure_unique_name=True, + trash_at=info["trash_at"], + properties=info["properties"]) for f, p in generatemapper.items(): if p.type == "File": @@ -134,6 +139,8 @@ class ArvadosJob(object): if reuse_req: enable_reuse = reuse_req["enableReuse"] + self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) + try: with Perf(metrics, "create %s" % self.name): response = self.arvrunner.api.jobs().create( @@ -150,7 +157,8 @@ class ArvadosJob(object): find_or_create=enable_reuse ).execute(num_retries=self.arvrunner.num_retries) - self.arvrunner.processes[response["uuid"]] = self + self.uuid = response["uuid"] + self.arvrunner.process_submitted(self) self.update_pipeline_component(response) @@ -171,9 +179,6 @@ class ArvadosJob(object): logger.info("Creating read permission on job %s: %s", response["uuid"], e) - - with Perf(metrics, "done %s" % self.name): - self.done(response) else: logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"]) except Exception as e: @@ -181,27 +186,28 @@ class ArvadosJob(object): self.output_callback({}, "permanentFail") def update_pipeline_component(self, record): - if self.arvrunner.pipeline: - self.arvrunner.pipeline["components"][self.name] = {"job": record} - with Perf(metrics, "update_pipeline_component %s" % self.name): - self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update( - uuid=self.arvrunner.pipeline["uuid"], - body={ - "components": self.arvrunner.pipeline["components"] - }).execute(num_retries=self.arvrunner.num_retries) - if self.arvrunner.uuid: - try: - job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute() - if job: - components = job["components"] - components[self.name] = record["uuid"] - self.arvrunner.api.jobs().update( - uuid=self.arvrunner.uuid, + with self.arvrunner.workflow_eval_lock: + if self.arvrunner.pipeline: + self.arvrunner.pipeline["components"][self.name] = {"job": record} + with Perf(metrics, "update_pipeline_component %s" % self.name): + self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update( + uuid=self.arvrunner.pipeline["uuid"], body={ - "components": components + "components": self.arvrunner.pipeline["components"] }).execute(num_retries=self.arvrunner.num_retries) - except Exception as e: - logger.info("Error adding to components: %s", e) + if self.arvrunner.uuid: + try: + job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute() + if job: + components = job["components"] + components[self.name] = record["uuid"] + self.arvrunner.api.jobs().update( + uuid=self.arvrunner.uuid, + body={ + "components": components + }).execute(num_retries=self.arvrunner.num_retries) + except Exception as e: + logger.info("Error adding to components: %s", e) def done(self, record): try: @@ -263,8 +269,27 @@ class ArvadosJob(object): processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + + def _get_intermediate_collection_info(self): + trash_time = None + if self.arvrunner.intermediate_output_ttl > 0: + trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl) + + current_container_uuid = None + try: + current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries) + current_container_uuid = current_container['uuid'] + except ApiError as e: + # Status code 404 just means we're not running in a container. + if e.resp.status != 404: + logger.info("Getting current container: %s", e) + props = {"type": "Intermediate", + "container": current_container_uuid} + + return {"name" : "Intermediate collection", + "trash_at" : trash_time, + "properties" : props} + class RunnerJob(Runner): """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" @@ -280,7 +305,7 @@ class RunnerJob(Runner): if self.tool.tool["id"].startswith("keep:"): self.job_order["cwl:tool"] = self.tool.tool["id"][5:] else: - packed = packed_workflow(self.arvrunner, self.tool) + packed = packed_workflow(self.arvrunner, self.tool, self.merged_map) wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed) self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh @@ -299,6 +324,9 @@ class RunnerJob(Runner): if self.on_error: self.job_order["arv:on_error"] = self.on_error + if kwargs.get("debug"): + self.job_order["arv:debug"] = True + return { "script": "cwl-runner", "script_version": "master", @@ -311,8 +339,8 @@ class RunnerJob(Runner): } } - def run(self, *args, **kwargs): - job_spec = self.arvados_job_spec(*args, **kwargs) + def run(self, **kwargs): + job_spec = self.arvados_job_spec(**kwargs) job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) @@ -348,10 +376,7 @@ class RunnerJob(Runner): return self.uuid = job["uuid"] - self.arvrunner.processes[self.uuid] = self - - if job["state"] in ("Complete", "Failed", "Cancelled"): - self.done(job) + self.arvrunner.process_submitted(self) class RunnerTemplate(object): @@ -367,7 +392,7 @@ class RunnerTemplate(object): } def __init__(self, runner, tool, job_order, enable_reuse, uuid, - submit_runner_ram=0, name=None): + submit_runner_ram=0, name=None, merged_map=None): self.runner = runner self.tool = tool self.job = RunnerJob( @@ -378,7 +403,8 @@ class RunnerTemplate(object): output_name=None, output_tags=None, submit_runner_ram=submit_runner_ram, - name=name) + name=name, + merged_map=merged_map) self.uuid = uuid def pipeline_component_spec(self):