Merge branch '10186-crunch2-slurm-partition' closes #10186
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 315be0c8b36c10a8bd65bf9960724df6011df9e6..e5b4e006e8cce7cac780436fc06c6dcd79882730 100644 (file)
@@ -4,6 +4,7 @@ from functools import partial
 import logging
 import json
 import re
+from cStringIO import StringIO
 
 import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
@@ -13,22 +14,57 @@ from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import arvados.collection
+import ruamel.yaml as yaml
 
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
 
 logger = logging.getLogger('arvados.cwl-runner')
 
-cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
+
+def trim_listing(obj):
+    """Remove 'listing' field from Directory objects that are keep references.
+
+    When Directory objects represent Keep references, it redundant and
+    potentially very expensive to pass fully enumerated Directory objects
+    between instances of cwl-runner (e.g. a submitting a job, or using the
+    RunInSingleContainer feature), so delete the 'listing' field when it is
+    safe to do so.
+    """
+
+    if obj.get("location", "").startswith("keep:") and "listing" in obj:
+        del obj["listing"]
+    if obj.get("location", "").startswith("_:"):
+        del obj["location"]
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, keepprefix, loadref_run):
+                        workflowobj, uri, loadref_run):
+    """Upload the dependencies of the workflowobj document to Keep.
+
+    Returns a pathmapper object mapping local paths to keep references.  Also
+    does an in-place update of references in "workflowobj".
+
+    Use scandeps to find $import, $include, $schemas, run, File and Directory
+    fields that represent external references.
+
+    If workflowobj has an "id" field, this will reload the document to ensure
+    it is scanning the raw document prior to preprocessing.
+    """
+
     loaded = set()
     def loadref(b, u):
         joined = urlparse.urljoin(b, u)
-        if joined not in loaded:
-            loaded.add(joined)
-            return document_loader.fetch(urlparse.urljoin(b, u))
+        defrg, _ = urlparse.urldefrag(joined)
+        if defrg not in loaded:
+            loaded.add(defrg)
+            # Use fetch_text to get raw file (before preprocessing).
+            text = document_loader.fetch_text(defrg)
+            if isinstance(text, bytes):
+                textIO = StringIO(text.decode('utf-8'))
+            else:
+                textIO = StringIO(text)
+            return yaml.safe_load(textIO)
         else:
             return {}
 
@@ -37,27 +73,30 @@ def upload_dependencies(arvrunner, name, document_loader,
     else:
         loadref_fields = set(("$import",))
 
-    sc = scandeps(uri, workflowobj,
+    scanobj = workflowobj
+    if "id" in workflowobj:
+        # Need raw file content (before preprocessing) to ensure
+        # that external references in $include and $mixin are captured.
+        scanobj = loadref("", workflowobj["id"])
+
+    sc = scandeps(uri, scanobj,
                   loadref_fields,
-                  set(("$include", "$schemas", "path", "location")),
+                  set(("$include", "$schemas", "location")),
                   loadref)
 
-    files = []
-    def visitFiles(path):
-        files.append(path)
+    normalizeFilesDirs(sc)
 
-    adjustFileObjs(sc, visitFiles)
-    adjustDirObjs(sc, visitFiles)
+    if "id" in workflowobj:
+        sc.append({"class": "File", "location": workflowobj["id"]})
 
-    normalizeFilesDirs(files)
-
-    mapper = ArvPathMapper(arvrunner, files, "",
-                           keepprefix+"%s",
-                           keepprefix+"%s/%s",
+    mapper = ArvPathMapper(arvrunner, sc, "",
+                           "keep:%s",
+                           "keep:%s/%s",
                            name=name)
 
     def setloc(p):
-        p["location"] = mapper.mapper(p["location"]).target
+        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+            p["location"] = mapper.mapper(p["location"]).resolved
     adjustFileObjs(workflowobj, setloc)
     adjustDirObjs(workflowobj, setloc)
 
@@ -75,13 +114,15 @@ def upload_docker(arvrunner, tool):
 
 
 class Runner(object):
-    def __init__(self, runner, tool, job_order, enable_reuse):
+    def __init__(self, runner, tool, job_order, enable_reuse, output_name):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
         self.running = False
         self.enable_reuse = enable_reuse
         self.uuid = None
+        self.final_output = None
+        self.output_name = output_name
 
     def update_pipeline_component(self, record):
         pass
@@ -96,7 +137,6 @@ class Runner(object):
                                              self.tool.doc_loader,
                                              self.tool.tool,
                                              self.tool.tool["id"],
-                                             kwargs.get("keepprefix", ""),
                                              True)
 
         jobmapper = upload_dependencies(self.arvrunner,
@@ -104,9 +144,10 @@ class Runner(object):
                                         self.tool.doc_loader,
                                         self.job_order,
                                         self.job_order.get("id", "#"),
-                                        kwargs.get("keepprefix", ""),
                                         False)
 
+        adjustDirObjs(self.job_order, trim_listing)
+
         if "id" in self.job_order:
             del self.job_order["id"]
 
@@ -130,7 +171,11 @@ class Runner(object):
         outputs = None
         try:
             try:
-                outc = arvados.collection.Collection(record["output"])
+                self.final_output = record["output"]
+                outc = arvados.collection.CollectionReader(self.final_output,
+                                                           api_client=self.arvrunner.api,
+                                                           keep_client=self.arvrunner.keep_client,
+                                                           num_retries=self.arvrunner.num_retries)
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
                 def keepify(fileobj):