19280: what are we scanning here actually
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 934aeb4018ab89f427676a741e7f113896900d12..e81f621750c8e2f8c7c310918b8f0725a68df9db 100644 (file)
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -28,7 +51,7 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
@@ -311,19 +334,28 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     metadata = scanobj
 
-    sc_result = scandeps(uri, scanobj,
-                         loadref_fields,
-                         set(("$include", "location")),
-                         loadref, urljoin=document_loader.fetcher.urljoin,
-                         nestdirs=False)
+    with Perf(metrics, "scandeps include, location"):
+        sc_result = scandeps(uri, scanobj,
+                             loadref_fields,
+                             set(("$include", "location")),
+                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
 
-    optional_deps = scandeps(uri, scanobj,
-                                  loadref_fields,
-                                  set(("$schemas",)),
-                                  loadref, urljoin=document_loader.fetcher.urljoin,
-                                  nestdirs=False)
+    with Perf(metrics, "scandeps $schemas"):
+        optional_deps = scandeps(uri, scanobj,
+                                      loadref_fields,
+                                      set(("$schemas",)),
+                                      loadref, urljoin=document_loader.fetcher.urljoin,
+                                      nestdirs=False)
 
-    sc_result.extend(optional_deps)
+    if sc_result is None:
+        sc_result = []
+
+    if optional_deps is None:
+        optional_deps = []
+
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -351,30 +383,34 @@ def upload_dependencies(arvrunner, name, document_loader,
             sc.append(obj)
         collect_uuids(obj)
 
-    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
-    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+    with Perf(metrics, "collect uuids"):
+        visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+    with Perf(metrics, "collect uploads"):
+        visit_class(sc_result, ("File", "Directory"), collect_uploads)
 
     # Resolve any collection uuids we found to portable data hashes
     # and assign them to uuid_map
     uuid_map = {}
     fetch_uuids = list(uuids.keys())
-    while fetch_uuids:
-        # For a large number of fetch_uuids, API server may limit
-        # response size, so keep fetching from API server has nothing
-        # more to give us.
-        lookups = arvrunner.api.collections().list(
-            filters=[["uuid", "in", fetch_uuids]],
-            count="none",
-            select=["uuid", "portable_data_hash"]).execute(
-                num_retries=arvrunner.num_retries)
+    with Perf(metrics, "fetch_uuids"):
+        while fetch_uuids:
+            # For a large number of fetch_uuids, API server may limit
+            # response size, so keep fetching from API server has nothing
+            # more to give us.
+            lookups = arvrunner.api.collections().list(
+                filters=[["uuid", "in", fetch_uuids]],
+                count="none",
+                select=["uuid", "portable_data_hash"]).execute(
+                    num_retries=arvrunner.num_retries)
 
-        if not lookups["items"]:
-            break
+            if not lookups["items"]:
+                break
 
-        for l in lookups["items"]:
-            uuid_map[l["uuid"]] = l["portable_data_hash"]
+            for l in lookups["items"]:
+                uuid_map[l["uuid"]] = l["portable_data_hash"]
 
-        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+            fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
@@ -420,12 +456,13 @@ def upload_dependencies(arvrunner, name, document_loader,
         else:
             del discovered[d]
 
-    mapper = ArvPathMapper(arvrunner, sc, "",
-                           "keep:%s",
-                           "keep:%s/%s",
-                           name=name,
-                           single_collection=True,
-                           optional_deps=optional_deps)
+    with Perf(metrics, "mapper"):
+        mapper = ArvPathMapper(arvrunner, sc, "",
+                               "keep:%s",
+                               "keep:%s/%s",
+                               name=name,
+                               single_collection=True,
+                               optional_deps=optional_deps)
 
     keeprefs = set()
     def addkeepref(k):
@@ -469,8 +506,9 @@ def upload_dependencies(arvrunner, name, document_loader,
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
-    visit_class(workflowobj, ("File", "Directory"), setloc)
-    visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), setloc)
+        visit_class(discovered, ("File", "Directory"), setloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
@@ -665,7 +703,7 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
     def upload_tool_deps(deptool):
         if "id" in deptool:
             discovered_secondaryfiles = {}
-            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"]):
+            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
                 pm = upload_dependencies(arvrunner,
                                          "%s dependencies" % (shortname(deptool["id"])),
                                          document_loader,
@@ -850,3 +888,239 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+
+
+# --- from cwltool ---
+
+
+CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
+
+
+def scandeps(
+    base: str,
+    doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
+    reffields: Set[str],
+    urlfields: Set[str],
+    loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
+    urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
+    nestdirs: bool = True,
+    do_normalize: bool = True,
+) -> Optional[MutableSequence[CWLObjectType]]:
+
+    """Given a CWL document or input object, search for dependencies
+    (references to external files) of 'doc' and return them as a list
+    of File or Directory objects.
+
+    The 'base' is the base URL for relative references.
+
+    Looks for objects with 'class: File' or 'class: Directory' and
+    adds them to the list of dependencies.
+
+    Anything in 'urlfields' is also added as a File dependency.
+
+    Anything in 'reffields' (such as workflow step 'run') will be
+    added as a dependency and also loaded (using the 'loadref'
+    function) and recursively scanned for dependencies.  Those
+    dependencies will be added as secondary files to the primary file.
+
+    If "nestdirs" is true, create intermediate directory objects when
+    a file is located in a subdirectory under the starting directory.
+    This is so that if the dependencies are materialized, they will
+    produce the same relative file system locations.
+
+    """
+
+    if do_normalize:
+        import pprint
+        pprint.pprint(doc)
+
+    r: Optional[MutableSequence[CWLObjectType]] = None
+    if isinstance(doc, MutableMapping):
+        if "id" in doc:
+            if cast(str, doc["id"]).startswith("file://"):
+                df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
+                if base != df:
+                    if r is None:
+                        r = []
+                    r.append({"class": "File", "location": df, "format": CWL_IANA})
+                    base = df
+
+        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
+            with Perf(metrics, "File or Directory with location"):
+                u = cast(Optional[str], doc.get("location", doc.get("path")))
+                if u and not u.startswith("_:"):
+                    deps = {
+                        "class": doc["class"],
+                        "location": urljoin(base, u),
+                    }  # type: CWLObjectType
+                    if "basename" in doc:
+                        deps["basename"] = doc["basename"]
+                    if doc["class"] == "Directory" and "listing" in doc:
+                        deps["listing"] = doc["listing"]
+                    if doc["class"] == "File" and "secondaryFiles" in doc:
+                        sd = scandeps(
+                            base,
+                            cast(
+                                Union[CWLObjectType, MutableSequence[CWLObjectType]],
+                                doc["secondaryFiles"],
+                            ),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                        if sd:
+                            deps["secondaryFiles"] = cast(
+                                CWLOutputAtomType,
+                                sd
+                            )
+                    if nestdirs:
+                        deps = nestdir(base, deps)
+                    if r is None:
+                        r = []
+                    r.append(deps)
+                else:
+                    if doc["class"] == "Directory" and "listing" in doc:
+                        sd = scandeps(
+                                base,
+                                cast(MutableSequence[CWLObjectType], doc["listing"]),
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                        if sd:
+                            if r is None:
+                                r = []
+                            r.extend(sd)
+                    elif doc["class"] == "File" and "secondaryFiles" in doc:
+                        sd = scandeps(
+                                base,
+                                cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                        if sd:
+                            if r is None:
+                                r = sd
+                            else:
+                                r.extend(sd)
+
+        for k, v in doc.items():
+            if k in reffields:
+                with Perf(metrics, "k in reffields"):
+                    for u2 in aslist(v):
+                        if isinstance(u2, MutableMapping):
+                            sd = scandeps(
+                                    base,
+                                    u2,
+                                    reffields,
+                                    urlfields,
+                                    loadref,
+                                    urljoin=urljoin,
+                                    nestdirs=nestdirs,
+                                    do_normalize=False,
+                                )
+                            if sd:
+                                if r is None:
+                                    r = sd
+                                else:
+                                    r.extend(sd)
+                        else:
+                            subid = urljoin(base, u2)
+                            basedf, _ = urllib.parse.urldefrag(base)
+                            subiddf, _ = urllib.parse.urldefrag(subid)
+                            if basedf == subiddf:
+                                continue
+                            sub = cast(
+                                Union[MutableSequence[CWLObjectType], CWLObjectType],
+                                loadref(base, u2),
+                            )
+                            deps2 = {
+                                "class": "File",
+                                "location": subid,
+                                "format": CWL_IANA,
+                            }  # type: CWLObjectType
+                            sf = scandeps(
+                                subid,
+                                sub,
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                            if sf:
+                                deps2["secondaryFiles"] = cast(
+                                    MutableSequence[CWLOutputAtomType], mergedirs(sf)
+                                )
+                            if nestdirs:
+                                deps2 = nestdir(base, deps2)
+                            if r is None:
+                                r = []
+                            r.append(deps2)
+            elif k in urlfields and k != "location":
+                with Perf(metrics, "k in urlfields"):
+                    for u3 in aslist(v):
+                        deps = {"class": "File", "location": urljoin(base, u3)}
+                        if nestdirs:
+                            deps = nestdir(base, deps)
+                        if r is None:
+                            r = []
+                        r.append(deps)
+            elif doc.get("class") in ("File", "Directory") and k in (
+                "listing",
+                "secondaryFiles",
+            ):
+                # should be handled earlier.
+                pass
+            else:
+                with Perf(metrics, "k is something else"):
+                    sd = scandeps(
+                            base,
+                            cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                    if sd:
+                        if r is None:
+                            r = sd
+                        else:
+                            r.extend(sd)
+    elif isinstance(doc, MutableSequence):
+        with Perf(metrics, "d in doc"):
+            for d in doc:
+                sd = scandeps(
+                        base,
+                        d,
+                        reffields,
+                        urlfields,
+                        loadref,
+                        urljoin=urljoin,
+                        nestdirs=nestdirs,
+                        do_normalize=False,
+                    )
+                if r is None:
+                    r = sd
+                else:
+                    r.extend(sd)
+
+    if r and do_normalize:
+        normalizeFilesDirs(r)
+
+    return r