From 275e7919b78fd9d19c8f6b62c8ba97052bba589c Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 2 Aug 2022 11:25:00 -0400 Subject: [PATCH] 19280: Try optimizing scandeps Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/runner.py | 263 +++++++++++++++++++++++++++++++++- 1 file changed, 260 insertions(+), 3 deletions(-) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 2582c0a3a3..bbf8f202df 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -17,7 +17,30 @@ import json import copy from collections import namedtuple from io import StringIO -from typing import Mapping, Sequence +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Sequence, + MutableSequence, + Optional, + Set, + Sized, + Tuple, + Type, + Union, + cast, +) +from cwltool.utils import ( + CWLObjectType, + CWLOutputAtomType, + CWLOutputType, +) if os.name == "posix" and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -28,7 +51,7 @@ from schema_salad.sourceline import SourceLine, cmap from cwltool.command_line_tool import CommandLineTool import cwltool.workflow -from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, +from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) from cwltool.load_tool import fetch_document from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class @@ -325,7 +348,14 @@ def upload_dependencies(arvrunner, name, document_loader, loadref, urljoin=document_loader.fetcher.urljoin, nestdirs=False) - sc_result.extend(optional_deps) + if sc_result is None: + sc_result = [] + + if optional_deps is None: + optional_deps = [] + + if optional_deps: + sc_result.extend(optional_deps) sc = [] uuids = {} @@ -858,3 +888,230 @@ class Runner(Process): self.arvrunner.output_callback({}, "permanentFail") else: self.arvrunner.output_callback(outputs, processStatus) + + + + +# --- from cwltool --- + + +CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" + + +def scandeps( + base: str, + doc: Union[CWLObjectType, MutableSequence[CWLObjectType]], + reffields: Set[str], + urlfields: Set[str], + loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]], + urljoin: Callable[[str, str], str] = urllib.parse.urljoin, + nestdirs: bool = True, + do_normalize: bool = True, +) -> Optional[MutableSequence[CWLObjectType]]: + + """Given a CWL document or input object, search for dependencies + (references to external files) of 'doc' and return them as a list + of File or Directory objects. + + The 'base' is the base URL for relative references. + + Looks for objects with 'class: File' or 'class: Directory' and + adds them to the list of dependencies. + + Anything in 'urlfields' is also added as a File dependency. + + Anything in 'reffields' (such as workflow step 'run') will be + added as a dependency and also loaded (using the 'loadref' + function) and recursively scanned for dependencies. Those + dependencies will be added as secondary files to the primary file. + + If "nestdirs" is true, create intermediate directory objects when + a file is located in a subdirectory under the starting directory. + This is so that if the dependencies are materialized, they will + produce the same relative file system locations. + + """ + + r: Optional[MutableSequence[CWLObjectType]] = None + if isinstance(doc, MutableMapping): + if "id" in doc: + if cast(str, doc["id"]).startswith("file://"): + df, _ = urllib.parse.urldefrag(cast(str, doc["id"])) + if base != df: + if r is None: + r = [] + r.append({"class": "File", "location": df, "format": CWL_IANA}) + base = df + + if doc.get("class") in ("File", "Directory") and "location" in urlfields: + u = cast(Optional[str], doc.get("location", doc.get("path"))) + if u and not u.startswith("_:"): + deps = { + "class": doc["class"], + "location": urljoin(base, u), + } # type: CWLObjectType + if "basename" in doc: + deps["basename"] = doc["basename"] + if doc["class"] == "Directory" and "listing" in doc: + deps["listing"] = doc["listing"] + if doc["class"] == "File" and "secondaryFiles" in doc: + sd = scandeps( + base, + cast( + Union[CWLObjectType, MutableSequence[CWLObjectType]], + doc["secondaryFiles"], + ), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + do_normalize=False, + ) + if sd: + deps["secondaryFiles"] = cast( + CWLOutputAtomType, + sd + ) + if nestdirs: + deps = nestdir(base, deps) + if r is None: + r = [] + r.append(deps) + else: + if doc["class"] == "Directory" and "listing" in doc: + sd = scandeps( + base, + cast(MutableSequence[CWLObjectType], doc["listing"]), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + do_normalize=False, + ) + if sd: + if r is None: + r = [] + r.extend(sd) + elif doc["class"] == "File" and "secondaryFiles" in doc: + sd = scandeps( + base, + cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + do_normalize=False, + ) + if sd: + if r is None: + r = sd + else: + r.extend(sd) + + for k, v in doc.items(): + if k in reffields: + for u2 in aslist(v): + if isinstance(u2, MutableMapping): + sd = scandeps( + base, + u2, + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + do_normalize=False, + ) + if sd: + if r is None: + r = sd + else: + r.extend(sd) + else: + subid = urljoin(base, u2) + basedf, _ = urllib.parse.urldefrag(base) + subiddf, _ = urllib.parse.urldefrag(subid) + if basedf == subiddf: + continue + sub = cast( + Union[MutableSequence[CWLObjectType], CWLObjectType], + loadref(base, u2), + ) + deps2 = { + "class": "File", + "location": subid, + "format": CWL_IANA, + } # type: CWLObjectType + sf = scandeps( + subid, + sub, + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + do_normalize=False, + ) + if sf: + deps2["secondaryFiles"] = cast( + MutableSequence[CWLOutputAtomType], mergedirs(sf) + ) + if nestdirs: + deps2 = nestdir(base, deps2) + if r is None: + r = [] + r.append(deps2) + elif k in urlfields and k != "location": + for u3 in aslist(v): + deps = {"class": "File", "location": urljoin(base, u3)} + if nestdirs: + deps = nestdir(base, deps) + if r is None: + r = [] + r.append(deps) + elif doc.get("class") in ("File", "Directory") and k in ( + "listing", + "secondaryFiles", + ): + # should be handled earlier. + pass + else: + sd = scandeps( + base, + cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v), + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + do_normalize=False, + ) + if sd: + if r is None: + r = sd + else: + r.extend(sd) + elif isinstance(doc, MutableSequence): + for d in doc: + sd = scandeps( + base, + d, + reffields, + urlfields, + loadref, + urljoin=urljoin, + nestdirs=nestdirs, + do_normalize=False, + ) + if r is None: + r = sd + else: + r.extend(sd) + + if r and do_normalize: + normalizeFilesDirs(r) + + return r -- 2.30.2