+class RunnerJob(object):
+ """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
+ def __init__(self, runner, tool, job_order, enable_reuse):
+ self.arvrunner = runner
+ self.tool = tool
+ self.job_order = job_order
+ self.running = False
+ self.enable_reuse = enable_reuse
+
+ def update_pipeline_component(self, record):
+ pass
+
+ def upload_docker(self, tool):
+ if isinstance(tool, cwltool.draft2tool.CommandLineTool):
+ (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ if docker_req:
+ arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ self.upload_docker(s.embedded_tool)
+
+ def run(self, dry_run=False, pull_image=True, **kwargs):
+ self.upload_docker(self.tool)
+
+ workflowfiles = set()
+ jobfiles = set()
+ workflowfiles.add(self.tool.tool["id"])
+
+ self.name = os.path.basename(self.tool.tool["id"])
+
+ def visitFiles(files, path):
+ files.add(path)
+ return path
+
+ document_loader, _, _ = cwltool.process.get_schema()
+ def loadref(b, u):
+ return document_loader.resolve_ref(u, base_url=b)[0]
+
+ sc = scandeps("", self.tool.tool,
+ set(("$import", "run")),
+ set(("$include", "$schemas", "path")),
+ loadref)
+ adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
+ adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
+
+ workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+ "%s",
+ "%s/%s",
+ name=self.name,
+ **kwargs)
+
+ jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+ "%s",
+ "%s/%s",
+ name=os.path.basename(self.job_order.get("id", "#")),
+ **kwargs)
+
+ adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+ if "id" in self.job_order:
+ del self.job_order["id"]
+
+ self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+
+ response = self.arvrunner.api.jobs().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "script": "cwl-runner",
+ "script_version": "master",
+ "repository": "arvados",
+ "script_parameters": self.job_order,
+ "runtime_constraints": {
+ "docker_image": "arvados/jobs"
+ }
+ }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+
+ self.arvrunner.jobs[response["uuid"]] = self
+
+ logger.info("Submitted job %s", response["uuid"])
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
+
+ def done(self, record):
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ outputs = None
+ try:
+ try:
+ outc = arvados.collection.Collection(record["output"])
+ with outc.open("cwl.output.json") as f:
+ outputs = json.load(f)
+ except Exception as e:
+ logger.error("While getting final output object: %s", e)
+ self.arvrunner.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
+