10088: Bump cwltool dependency for another bugfix to scandeps.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 72f347e439ab61b9b3f77d2fc6ac4f98bd08bd8e..155c6a05039ec65649975d038073185fbc5c4338 100644 (file)
@@ -4,6 +4,7 @@ from functools import partial
 import logging
 import json
 import re
+from cStringIO import StringIO
 
 import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
@@ -13,6 +14,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import arvados.collection
+import ruamel.yaml as yaml
 
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
@@ -23,12 +25,31 @@ cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run):
+    """Upload the dependencies of the workflowobj document to Keep.
+
+    Returns a pathmapper object mapping local paths to keep references.  Also
+    does an in-place update of references in "workflowobj".
+
+    Use scandeps to find $import, $include, $schemas, run, File and Directory
+    fields that represent external references.
+
+    If workflowobj has an "id" field, this will reload the document to ensure
+    it is scanning the raw document prior to preprocessing.
+    """
+
     loaded = set()
     def loadref(b, u):
         joined = urlparse.urljoin(b, u)
-        if joined not in loaded:
-            loaded.add(joined)
-            return document_loader.fetch(urlparse.urljoin(b, u))
+        defrg, _ = urlparse.urldefrag(joined)
+        if defrg not in loaded:
+            loaded.add(defrg)
+            # Use fetch_text to get raw file (before preprocessing).
+            text = document_loader.fetch_text(defrg)
+            if isinstance(text, bytes):
+                textIO = StringIO(text.decode('utf-8'))
+            else:
+                textIO = StringIO(text)
+            return yaml.safe_load(textIO)
         else:
             return {}
 
@@ -37,9 +58,15 @@ def upload_dependencies(arvrunner, name, document_loader,
     else:
         loadref_fields = set(("$import",))
 
-    sc = scandeps(uri, workflowobj,
+    scanobj = workflowobj
+    if "id" in workflowobj:
+        # Need raw file content (before preprocessing) to ensure
+        # that external references in $include and $mixin are captured.
+        scanobj = loadref("", workflowobj["id"])
+
+    sc = scandeps(uri, scanobj,
                   loadref_fields,
-                  set(("$include", "$schemas", "path", "location")),
+                  set(("$include", "$schemas", "location")),
                   loadref)
 
     files = []