10088: upload_dependencies reads raw files to find actual dependencies
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 315be0c8b36c10a8bd65bf9960724df6011df9e6..570675bc4e9ca097416897cf13deffde14971e0d 100644 (file)
@@ -4,6 +4,7 @@ from functools import partial
 import logging
 import json
 import re
+from cStringIO import StringIO
 
 import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
@@ -13,6 +14,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import arvados.collection
+import ruamel.yaml as yaml
 
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
@@ -22,13 +24,20 @@ logger = logging.getLogger('arvados.cwl-runner')
 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, keepprefix, loadref_run):
+                        workflowobj, uri, loadref_run):
     loaded = set()
     def loadref(b, u):
         joined = urlparse.urljoin(b, u)
-        if joined not in loaded:
-            loaded.add(joined)
-            return document_loader.fetch(urlparse.urljoin(b, u))
+        defrg, _ = urlparse.urldefrag(joined)
+        if defrg not in loaded:
+            loaded.add(defrg)
+            # Use fetch_text to get raw file (before preprocessing).
+            text = document_loader.fetch_text(defrg)
+            if isinstance(text, bytes):
+                textIO = StringIO(text.decode('utf-8'))
+            else:
+                textIO = StringIO(text)
+            return yaml.safe_load(textIO)
         else:
             return {}
 
@@ -37,9 +46,15 @@ def upload_dependencies(arvrunner, name, document_loader,
     else:
         loadref_fields = set(("$import",))
 
-    sc = scandeps(uri, workflowobj,
+    scanobj = workflowobj
+    if "id" in workflowobj:
+        # Need raw file content (before preprocessing) to ensure
+        # that external references in $include and $mixin are captured.
+        scanobj = loadref("", workflowobj["id"])
+
+    sc = scandeps(uri, scanobj,
                   loadref_fields,
-                  set(("$include", "$schemas", "path", "location")),
+                  set(("$include", "$schemas")),
                   loadref)
 
     files = []
@@ -51,9 +66,12 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     normalizeFilesDirs(files)
 
+    if "id" in workflowobj:
+        files.append({"class": "File", "location": workflowobj["id"]})
+
     mapper = ArvPathMapper(arvrunner, files, "",
-                           keepprefix+"%s",
-                           keepprefix+"%s/%s",
+                           "keep:%s",
+                           "keep:%s/%s",
                            name=name)
 
     def setloc(p):
@@ -96,7 +114,6 @@ class Runner(object):
                                              self.tool.doc_loader,
                                              self.tool.tool,
                                              self.tool.tool["id"],
-                                             kwargs.get("keepprefix", ""),
                                              True)
 
         jobmapper = upload_dependencies(self.arvrunner,
@@ -104,7 +121,6 @@ class Runner(object):
                                         self.tool.doc_loader,
                                         self.job_order,
                                         self.job_order.get("id", "#"),
-                                        kwargs.get("keepprefix", ""),
                                         False)
 
         if "id" in self.job_order: