10088: upload_dependencies reads raw files to find actual dependencies
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 629b1042bb75400b9e8c6b05dacd65e3876362fc..570675bc4e9ca097416897cf13deffde14971e0d 100644 (file)
@@ -3,19 +3,95 @@ import urlparse
 from functools import partial
 import logging
 import json
+import re
+from cStringIO import StringIO
 
+import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
 from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import arvados.collection
+import ruamel.yaml as yaml
 
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
+def upload_dependencies(arvrunner, name, document_loader,
+                        workflowobj, uri, loadref_run):
+    loaded = set()
+    def loadref(b, u):
+        joined = urlparse.urljoin(b, u)
+        defrg, _ = urlparse.urldefrag(joined)
+        if defrg not in loaded:
+            loaded.add(defrg)
+            # Use fetch_text to get raw file (before preprocessing).
+            text = document_loader.fetch_text(defrg)
+            if isinstance(text, bytes):
+                textIO = StringIO(text.decode('utf-8'))
+            else:
+                textIO = StringIO(text)
+            return yaml.safe_load(textIO)
+        else:
+            return {}
+
+    if loadref_run:
+        loadref_fields = set(("$import", "run"))
+    else:
+        loadref_fields = set(("$import",))
+
+    scanobj = workflowobj
+    if "id" in workflowobj:
+        # Need raw file content (before preprocessing) to ensure
+        # that external references in $include and $mixin are captured.
+        scanobj = loadref("", workflowobj["id"])
+
+    sc = scandeps(uri, scanobj,
+                  loadref_fields,
+                  set(("$include", "$schemas")),
+                  loadref)
+
+    files = []
+    def visitFiles(path):
+        files.append(path)
+
+    adjustFileObjs(sc, visitFiles)
+    adjustDirObjs(sc, visitFiles)
+
+    normalizeFilesDirs(files)
+
+    if "id" in workflowobj:
+        files.append({"class": "File", "location": workflowobj["id"]})
+
+    mapper = ArvPathMapper(arvrunner, files, "",
+                           "keep:%s",
+                           "keep:%s/%s",
+                           name=name)
+
+    def setloc(p):
+        p["location"] = mapper.mapper(p["location"]).target
+    adjustFileObjs(workflowobj, setloc)
+    adjustDirObjs(workflowobj, setloc)
+
+    return mapper
+
+
+def upload_docker(arvrunner, tool):
+    if isinstance(tool, CommandLineTool):
+        (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+        if docker_req:
+            arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+    elif isinstance(tool, cwltool.workflow.Workflow):
+        for s in tool.steps:
+            upload_docker(arvrunner, s.embedded_tool)
+
+
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner
@@ -28,60 +104,24 @@ class Runner(object):
     def update_pipeline_component(self, record):
         pass
 
-    def upload_docker(self, tool):
-        if isinstance(tool, CommandLineTool):
-            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
-            if docker_req:
-                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
-        elif isinstance(tool, cwltool.workflow.Workflow):
-            for s in tool.steps:
-                self.upload_docker(s.embedded_tool)
-
-
     def arvados_job_spec(self, *args, **kwargs):
-        self.upload_docker(self.tool)
-
-        workflowfiles = set()
-        jobfiles = set()
-        workflowfiles.add(self.tool.tool["id"])
+        upload_docker(self.arvrunner, self.tool)
 
         self.name = os.path.basename(self.tool.tool["id"])
 
-        def visitFiles(files, path):
-            files.add(path)
-            return path
-
-        document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
-        loaded = set()
-        def loadref(b, u):
-            joined = urlparse.urljoin(b, u)
-            if joined not in loaded:
-                loaded.add(joined)
-                return document_loader.fetch(urlparse.urljoin(b, u))
-            else:
-                return {}
-
-        sc = scandeps(uri, workflowobj,
-                      set(("$import", "run")),
-                      set(("$include", "$schemas", "path")),
-                      loadref)
-        adjustFiles(sc, partial(visitFiles, workflowfiles))
-        adjustFiles(self.job_order, partial(visitFiles, jobfiles))
-
-        keepprefix = kwargs.get("keepprefix", "")
-        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
-                                       keepprefix+"%s",
-                                       keepprefix+"%s/%s",
-                                       name=self.name,
-                                       **kwargs)
-
-        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
-                                  keepprefix+"%s",
-                                  keepprefix+"%s/%s",
-                                  name=os.path.basename(self.job_order.get("id", "#")),
-                                  **kwargs)
-
-        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+        workflowmapper = upload_dependencies(self.arvrunner,
+                                             self.name,
+                                             self.tool.doc_loader,
+                                             self.tool.tool,
+                                             self.tool.tool["id"],
+                                             True)
+
+        jobmapper = upload_dependencies(self.arvrunner,
+                                        os.path.basename(self.job_order.get("id", "#")),
+                                        self.tool.doc_loader,
+                                        self.job_order,
+                                        self.job_order.get("id", "#"),
+                                        False)
 
         if "id" in self.job_order:
             del self.job_order["id"]
@@ -109,12 +149,12 @@ class Runner(object):
                 outc = arvados.collection.Collection(record["output"])
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
-                def keepify(path):
+                def keepify(fileobj):
+                    path = fileobj["location"]
                     if not path.startswith("keep:"):
-                        return "keep:%s/%s" % (record["output"], path)
-                    else:
-                        return path
-                adjustFiles(outputs, keepify)
+                        fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+                adjustFileObjs(outputs, keepify)
+                adjustDirObjs(outputs, keepify)
             except Exception as e:
                 logger.error("While getting final output object: %s", e)
             self.arvrunner.output_callback(outputs, processStatus)