19280: Try optimizing scandeps
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 2 Aug 2022 15:25:00 +0000 (11:25 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 2 Aug 2022 15:25:00 +0000 (11:25 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/runner.py

index 2582c0a3a3e3fd88c464b58a23c5d74b0281b92e..bbf8f202df5bf2b4b0220efc0192e12f929321d5 100644 (file)
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -28,7 +51,7 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
@@ -325,7 +348,14 @@ def upload_dependencies(arvrunner, name, document_loader,
                                       loadref, urljoin=document_loader.fetcher.urljoin,
                                       nestdirs=False)
 
-    sc_result.extend(optional_deps)
+    if sc_result is None:
+        sc_result = []
+
+    if optional_deps is None:
+        optional_deps = []
+
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -858,3 +888,230 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+
+
+# --- from cwltool ---
+
+
+CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
+
+
+def scandeps(
+    base: str,
+    doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
+    reffields: Set[str],
+    urlfields: Set[str],
+    loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
+    urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
+    nestdirs: bool = True,
+    do_normalize: bool = True,
+) -> Optional[MutableSequence[CWLObjectType]]:
+
+    """Given a CWL document or input object, search for dependencies
+    (references to external files) of 'doc' and return them as a list
+    of File or Directory objects.
+
+    The 'base' is the base URL for relative references.
+
+    Looks for objects with 'class: File' or 'class: Directory' and
+    adds them to the list of dependencies.
+
+    Anything in 'urlfields' is also added as a File dependency.
+
+    Anything in 'reffields' (such as workflow step 'run') will be
+    added as a dependency and also loaded (using the 'loadref'
+    function) and recursively scanned for dependencies.  Those
+    dependencies will be added as secondary files to the primary file.
+
+    If "nestdirs" is true, create intermediate directory objects when
+    a file is located in a subdirectory under the starting directory.
+    This is so that if the dependencies are materialized, they will
+    produce the same relative file system locations.
+
+    """
+
+    r: Optional[MutableSequence[CWLObjectType]] = None
+    if isinstance(doc, MutableMapping):
+        if "id" in doc:
+            if cast(str, doc["id"]).startswith("file://"):
+                df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
+                if base != df:
+                    if r is None:
+                        r = []
+                    r.append({"class": "File", "location": df, "format": CWL_IANA})
+                    base = df
+
+        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
+            u = cast(Optional[str], doc.get("location", doc.get("path")))
+            if u and not u.startswith("_:"):
+                deps = {
+                    "class": doc["class"],
+                    "location": urljoin(base, u),
+                }  # type: CWLObjectType
+                if "basename" in doc:
+                    deps["basename"] = doc["basename"]
+                if doc["class"] == "Directory" and "listing" in doc:
+                    deps["listing"] = doc["listing"]
+                if doc["class"] == "File" and "secondaryFiles" in doc:
+                    sd = scandeps(
+                        base,
+                        cast(
+                            Union[CWLObjectType, MutableSequence[CWLObjectType]],
+                            doc["secondaryFiles"],
+                        ),
+                        reffields,
+                        urlfields,
+                        loadref,
+                        urljoin=urljoin,
+                        nestdirs=nestdirs,
+                        do_normalize=False,
+                    )
+                    if sd:
+                        deps["secondaryFiles"] = cast(
+                            CWLOutputAtomType,
+                            sd
+                        )
+                if nestdirs:
+                    deps = nestdir(base, deps)
+                if r is None:
+                    r = []
+                r.append(deps)
+            else:
+                if doc["class"] == "Directory" and "listing" in doc:
+                    sd = scandeps(
+                            base,
+                            cast(MutableSequence[CWLObjectType], doc["listing"]),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                    if sd:
+                        if r is None:
+                            r = []
+                        r.extend(sd)
+                elif doc["class"] == "File" and "secondaryFiles" in doc:
+                    sd = scandeps(
+                            base,
+                            cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                    if sd:
+                        if r is None:
+                            r = sd
+                        else:
+                            r.extend(sd)
+
+        for k, v in doc.items():
+            if k in reffields:
+                for u2 in aslist(v):
+                    if isinstance(u2, MutableMapping):
+                        sd = scandeps(
+                                base,
+                                u2,
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                        if sd:
+                            if r is None:
+                                r = sd
+                            else:
+                                r.extend(sd)
+                    else:
+                        subid = urljoin(base, u2)
+                        basedf, _ = urllib.parse.urldefrag(base)
+                        subiddf, _ = urllib.parse.urldefrag(subid)
+                        if basedf == subiddf:
+                            continue
+                        sub = cast(
+                            Union[MutableSequence[CWLObjectType], CWLObjectType],
+                            loadref(base, u2),
+                        )
+                        deps2 = {
+                            "class": "File",
+                            "location": subid,
+                            "format": CWL_IANA,
+                        }  # type: CWLObjectType
+                        sf = scandeps(
+                            subid,
+                            sub,
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                        if sf:
+                            deps2["secondaryFiles"] = cast(
+                                MutableSequence[CWLOutputAtomType], mergedirs(sf)
+                            )
+                        if nestdirs:
+                            deps2 = nestdir(base, deps2)
+                        if r is None:
+                            r = []
+                        r.append(deps2)
+            elif k in urlfields and k != "location":
+                for u3 in aslist(v):
+                    deps = {"class": "File", "location": urljoin(base, u3)}
+                    if nestdirs:
+                        deps = nestdir(base, deps)
+                    if r is None:
+                        r = []
+                    r.append(deps)
+            elif doc.get("class") in ("File", "Directory") and k in (
+                "listing",
+                "secondaryFiles",
+            ):
+                # should be handled earlier.
+                pass
+            else:
+                sd = scandeps(
+                        base,
+                        cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
+                        reffields,
+                        urlfields,
+                        loadref,
+                        urljoin=urljoin,
+                        nestdirs=nestdirs,
+                        do_normalize=False,
+                    )
+                if sd:
+                    if r is None:
+                        r = sd
+                    else:
+                        r.extend(sd)
+    elif isinstance(doc, MutableSequence):
+        for d in doc:
+            sd = scandeps(
+                    base,
+                    d,
+                    reffields,
+                    urlfields,
+                    loadref,
+                    urljoin=urljoin,
+                    nestdirs=nestdirs,
+                    do_normalize=False,
+                )
+            if r is None:
+                r = sd
+            else:
+                r.extend(sd)
+
+    if r and do_normalize:
+        normalizeFilesDirs(r)
+
+    return r