From aa998f9f62321b44780923d60692beb6805b0ff5 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 25 Feb 2016 15:25:20 -0500 Subject: [PATCH] 8488: Fix output collection to accomodate reverse mapping fixes in cwltool. Catch errors and mark pipeline as terminated. --- sdk/cwl/arvados_cwl/__init__.py | 88 +++++++++++++++++++-------------- sdk/cwl/setup.py | 3 +- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 0d95ac4cc5..6cfdd0b2ab 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -17,6 +17,7 @@ import fnmatch import logging import re import os +import sys from cwltool.process import get_feature @@ -149,7 +150,7 @@ class ArvadosJob(object): response = self.arvrunner.api.jobs().create(body={ "script": "crunchrunner", "repository": "arvados", - "script_version": "master", + "script_version": "8488-cwl-crunchrunner-collection", "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"}, "runtime_constraints": runtime_constraints }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries) @@ -192,7 +193,8 @@ class ArvadosJob(object): try: outputs = {} if record["output"]: - outputs = self.collect_outputs("keep:" + record["output"]) + self.builder.outdir = "keep:" + record["output"] + outputs = self.collect_outputs(self.builder.outdir) except Exception as e: logger.exception("Got exception while collecting job outputs:") processStatus = "permanentFail" @@ -239,7 +241,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): def __init__(self, arvrunner, toolpath_object, **kwargs): - super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs) + super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs) self.arvrunner = arvrunner def makeJobRunner(self): @@ -323,49 +325,63 @@ class ArvCwlRunner(object): col.save_new("crunchrunner binary", ensure_unique_name=True) - self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]), - "components": {}, - "state": "RunningOnClient"}).execute(num_retries=self.num_retries) - self.fs_access = CollectionFsAccess(input_basedir) kwargs["fs_access"] = self.fs_access kwargs["enable_reuse"] = args.enable_reuse + kwargs["outdir"] = "$(task.outdir)" + kwargs["tmpdir"] = "$(task.tmpdir)" + if kwargs.get("conformance_test"): return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs) else: - jobiter = tool.job(job_order, - input_basedir, - self.output_callback, - **kwargs) - - for runnable in jobiter: - if runnable: - with self.lock: - runnable.run(**kwargs) - else: - if self.jobs: - try: - self.cond.acquire() - self.cond.wait() - finally: - self.cond.release() - else: - logger.error("Workflow cannot make any more progress.") - break - - while self.jobs: - try: - self.cond.acquire() - self.cond.wait() - finally: - self.cond.release() + self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]), + "components": {}, + "state": "RunningOnClient"}).execute(num_retries=self.num_retries) - events.close() + jobiter = tool.job(job_order, + input_basedir, + self.output_callback, + **kwargs) - if self.final_output is None: - raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + try: + for runnable in jobiter: + if runnable: + with self.lock: + runnable.run(**kwargs) + else: + if self.jobs: + try: + self.cond.acquire() + self.cond.wait(1) + except RuntimeError: + pass + finally: + self.cond.release() + else: + logger.error("Workflow cannot make any more progress.") + break + + while self.jobs: + try: + self.cond.acquire() + self.cond.wait(1) + except RuntimeError: + pass + finally: + self.cond.release() + + events.close() + + if self.final_output is None: + raise cwltool.workflow.WorkflowException("Workflow did not return a result.") + + except: + if sys.exc_info()[0] is not KeyboardInterrupt: + logger.exception("Caught unhandled exception, marking pipeline as failed") + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Failed"}).execute(num_retries=self.num_retries) return self.final_output diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py index 65ae16b515..cf9619c2be 100644 --- a/sdk/cwl/setup.py +++ b/sdk/cwl/setup.py @@ -30,7 +30,8 @@ setup(name='arvados-cwl-runner', 'bin/arvados-cwl-runner' ], install_requires=[ - 'cwltool>=1.0.20160129152024', + #'cwltool>=1.0.20160225040942', + 'cwltool', 'arvados-python-client>=0.1.20160122132348' ], zip_safe=True, -- 2.30.2