8442: Bugfixes from end-to-end testing.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 0cc23ab45953a52a139c91a18fd5527547e0088f..629b1042bb75400b9e8c6b05dacd65e3876362fc 100644 (file)
@@ -1,15 +1,21 @@
 import os
 import urlparse
 from functools import partial
+import logging
+import json
 
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
 from cwltool.load_tool import fetch_document
 
+import arvados.collection
+
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
 
+logger = logging.getLogger('arvados.cwl-runner')
+
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner
@@ -17,6 +23,7 @@ class Runner(object):
         self.job_order = job_order
         self.running = False
         self.enable_reuse = enable_reuse
+        self.uuid = None
 
     def update_pipeline_component(self, record):
         pass
@@ -45,8 +52,14 @@ class Runner(object):
             return path
 
         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+        loaded = set()
         def loadref(b, u):
-            return document_loader.fetch(urlparse.urljoin(b, u))
+            joined = urlparse.urljoin(b, u)
+            if joined not in loaded:
+                loaded.add(joined)
+                return document_loader.fetch(urlparse.urljoin(b, u))
+            else:
+                return {}
 
         sc = scandeps(uri, workflowobj,
                       set(("$import", "run")),
@@ -55,15 +68,16 @@ class Runner(object):
         adjustFiles(sc, partial(visitFiles, workflowfiles))
         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
 
+        keepprefix = kwargs.get("keepprefix", "")
         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
-                                       "%s",
-                                       "%s/%s",
+                                       keepprefix+"%s",
+                                       keepprefix+"%s/%s",
                                        name=self.name,
                                        **kwargs)
 
         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
-                                  "%s",
-                                  "%s/%s",
+                                  keepprefix+"%s",
+                                  keepprefix+"%s/%s",
                                   name=os.path.basename(self.job_order.get("id", "#")),
                                   **kwargs)
 
@@ -77,7 +91,15 @@ class Runner(object):
 
     def done(self, record):
         if record["state"] == "Complete":
-            processStatus = "success"
+            if record.get("exit_code") is not None:
+                if record["exit_code"] == 33:
+                    processStatus = "UnsupportedRequirement"
+                elif record["exit_code"] == 0:
+                    processStatus = "success"
+                else:
+                    processStatus = "permanentFail"
+            else:
+                processStatus = "success"
         else:
             processStatus = "permanentFail"
 
@@ -90,9 +112,11 @@ class Runner(object):
                 def keepify(path):
                     if not path.startswith("keep:"):
                         return "keep:%s/%s" % (record["output"], path)
+                    else:
+                        return path
                 adjustFiles(outputs, keepify)
             except Exception as e:
                 logger.error("While getting final output object: %s", e)
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.jobs[record["uuid"]]
+            del self.arvrunner.processes[record["uuid"]]