8488: Fix output collection to accomodate reverse mapping fixes in cwltool.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 25 Feb 2016 20:25:20 +0000 (15:25 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 25 Feb 2016 20:25:20 +0000 (15:25 -0500)
Catch errors and mark pipeline as terminated.

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/setup.py

index 0d95ac4cc522dde11059c8315465f1fbb42fe7fe..6cfdd0b2ab565d01b69678f36f32cd1189afca12 100644 (file)
@@ -17,6 +17,7 @@ import fnmatch
 import logging
 import re
 import os
+import sys
 
 from cwltool.process import get_feature
 
@@ -149,7 +150,7 @@ class ArvadosJob(object):
             response = self.arvrunner.api.jobs().create(body={
                 "script": "crunchrunner",
                 "repository": "arvados",
-                "script_version": "master",
+                "script_version": "8488-cwl-crunchrunner-collection",
                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
                 "runtime_constraints": runtime_constraints
             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
@@ -192,7 +193,8 @@ class ArvadosJob(object):
             try:
                 outputs = {}
                 if record["output"]:
-                    outputs = self.collect_outputs("keep:" + record["output"])
+                    self.builder.outdir = "keep:" + record["output"]
+                    outputs = self.collect_outputs(self.builder.outdir)
             except Exception as e:
                 logger.exception("Got exception while collecting job outputs:")
                 processStatus = "permanentFail"
@@ -239,7 +241,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def __init__(self, arvrunner, toolpath_object, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
+        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
 
     def makeJobRunner(self):
@@ -323,49 +325,63 @@ class ArvCwlRunner(object):
 
                 col.save_new("crunchrunner binary", ensure_unique_name=True)
 
-        self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
-                                                                   "components": {},
-                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
-
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = args.enable_reuse
 
+        kwargs["outdir"] = "$(task.outdir)"
+        kwargs["tmpdir"] = "$(task.tmpdir)"
+
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            jobiter = tool.job(job_order,
-                            input_basedir,
-                            self.output_callback,
-                            **kwargs)
-
-            for runnable in jobiter:
-                if runnable:
-                    with self.lock:
-                        runnable.run(**kwargs)
-                else:
-                    if self.jobs:
-                        try:
-                            self.cond.acquire()
-                            self.cond.wait()
-                        finally:
-                            self.cond.release()
-                    else:
-                        logger.error("Workflow cannot make any more progress.")
-                        break
-
-            while self.jobs:
-                try:
-                    self.cond.acquire()
-                    self.cond.wait()
-                finally:
-                    self.cond.release()
+            self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
+                                                                   "components": {},
+                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
-            events.close()
+            jobiter = tool.job(job_order,
+                               input_basedir,
+                               self.output_callback,
+                               **kwargs)
 
-            if self.final_output is None:
-                raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+            try:
+                for runnable in jobiter:
+                    if runnable:
+                        with self.lock:
+                            runnable.run(**kwargs)
+                    else:
+                        if self.jobs:
+                            try:
+                                self.cond.acquire()
+                                self.cond.wait(1)
+                            except RuntimeError:
+                                pass
+                            finally:
+                                self.cond.release()
+                        else:
+                            logger.error("Workflow cannot make any more progress.")
+                            break
+
+                while self.jobs:
+                    try:
+                        self.cond.acquire()
+                        self.cond.wait(1)
+                    except RuntimeError:
+                        pass
+                    finally:
+                        self.cond.release()
+
+                events.close()
+
+                if self.final_output is None:
+                    raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+
+            except:
+                if sys.exc_info()[0] is not KeyboardInterrupt:
+                    logger.exception("Caught unhandled exception, marking pipeline as failed")
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
 
             return self.final_output
 
index 65ae16b5158aebe388afc7f42e2e247f4e13733f..cf9619c2be60f927a27722a30c218321a66a2daa 100644 (file)
@@ -30,7 +30,8 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool>=1.0.20160129152024',
+          #'cwltool>=1.0.20160225040942',
+          'cwltool',
           'arvados-python-client>=0.1.20160122132348'
       ],
       zip_safe=True,