8442: Tweak internal handling of keep: paths, examine exit codes to determine
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 15 Jun 2016 15:23:47 +0000 (11:23 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 15 Jun 2016 20:09:46 +0000 (16:09 -0400)
success/fail.

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/runner.py

index ba26816c219bc871dfb85e6bfc6290916390d395..050b7b9a7e1a284fd78a4bb5097839dc3e8cb70b 100644 (file)
@@ -31,7 +31,7 @@ class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
     complete, and report output."""
 
-    def __init__(self, api_client, crunch2):
+    def __init__(self, api_client, crunch2=False):
         self.api = api_client
         self.jobs = {}
         self.lock = threading.Lock()
@@ -54,12 +54,12 @@ class ArvCwlRunner(object):
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
-
         else:
             logger.warn("Overall job status is %s", processStatus)
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+        self.final_status = processStatus
         self.final_output = out
 
     def on_message(self, event):
@@ -191,6 +191,9 @@ class ArvCwlRunner(object):
         finally:
             self.cond.release()
 
+        if self.final_status == "UnsupportedRequirement":
+            raise UnsupportedRequirement("Check log for details.")
+
         if self.final_output is None:
             raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
 
index 414ce636670c24d8d815438ee33e639b1f2d0a97..8b5ac5ac781d6ac5f01870063677a3b46772badc 100644 (file)
@@ -111,7 +111,17 @@ class ArvadosContainer(object):
     def done(self, record):
         try:
             if record["state"] == "Complete":
-                processStatus = "success"
+                rcode = record["exit_code"]
+                if self.successCodes and rcode in self.successCodes:
+                    processStatus = "success"
+                elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
+                    processStatus = "temporaryFail"
+                elif self.permanentFailCodes and rcode in self.permanentFailCodes:
+                    processStatus = "permanentFail"
+                elif rcode == 0:
+                    processStatus = "success"
+                else:
+                    processStatus = "permanentFail"
             else:
                 processStatus = "permanentFail"
 
@@ -152,7 +162,7 @@ class RunnerContainer(Runner):
         workflowname = os.path.basename(self.tool.tool["id"])
         workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
         workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[:workflowcollection.index('/')]
+        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
 
         container_image = arv_docker_get_image(self.arvrunner.api,
@@ -195,6 +205,7 @@ class RunnerContainer(Runner):
         }
 
     def run(self, *args, **kwargs):
+        kwargs["keepprefix"] = "keep:"
         job_spec = self.arvados_job_spec(*args, **kwargs)
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
index f7543482c75f2d5f07fc0c041031bb0b6fe011d3..002c0ca0d0ac606a34acc0776893a09be1daa1f8 100644 (file)
@@ -6,7 +6,7 @@ import json
 
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
 from cwltool.load_tool import fetch_document
 
 import arvados.collection
@@ -61,15 +61,16 @@ class Runner(object):
         adjustFiles(sc, partial(visitFiles, workflowfiles))
         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
 
+        keepprefix = kwargs.get("keepprefix", "")
         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
-                                       "%s",
-                                       "%s/%s",
+                                       keepprefix+"%s",
+                                       keepprefix+"%s/%s",
                                        name=self.name,
                                        **kwargs)
 
         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
-                                  "%s",
-                                  "%s/%s",
+                                  keepprefix+"%s",
+                                  keepprefix+"%s/%s",
                                   name=os.path.basename(self.job_order.get("id", "#")),
                                   **kwargs)
 
@@ -83,7 +84,15 @@ class Runner(object):
 
     def done(self, record):
         if record["state"] == "Complete":
-            processStatus = "success"
+            if record.get("exit_code") is not None:
+                if record["exit_code"] == 33:
+                    processStatus = "UnsupportedRequirement"
+                elif record["exit_code"] == 0:
+                    processStatus = "success"
+                else:
+                    processStatus = "permanentFail"
+            else:
+                processStatus = "success"
         else:
             processStatus = "permanentFail"
 
@@ -96,6 +105,8 @@ class Runner(object):
                 def keepify(path):
                     if not path.startswith("keep:"):
                         return "keep:%s/%s" % (record["output"], path)
+                    else:
+                        return path
                 adjustFiles(outputs, keepify)
             except Exception as e:
                 logger.error("While getting final output object: %s", e)