From a8f70f5f978641afa273adcbf995423228f0c7c4 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 2 Aug 2022 12:13:40 -0400 Subject: [PATCH] 19280: Only scan tools Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/runner.py | 247 +--------------------------------- 1 file changed, 7 insertions(+), 240 deletions(-) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index e81f621750..79876008b8 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -51,7 +51,7 @@ from schema_salad.sourceline import SourceLine, cmap from cwltool.command_line_tool import CommandLineTool import cwltool.workflow -from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs, +from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) from cwltool.load_tool import fetch_document from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class @@ -328,9 +328,12 @@ def upload_dependencies(arvrunner, name, document_loader, scanobj = workflowobj if "id" in workflowobj and not workflowobj["id"].startswith("_:"): - # Need raw file content (before preprocessing) to ensure - # that external references in $include and $mixin are captured. - scanobj = loadref("", workflowobj["id"]) + defrg, _ = urllib.parse.urldefrag(workflowobj["id"]) + if cache is not None and defrg in cache: + # if we haven't seen this file before, want raw file + # content (before preprocessing) to ensure that external + # references like $include haven't already been inlined. + scanobj = loadref("", workflowobj["id"]) metadata = scanobj @@ -888,239 +891,3 @@ class Runner(Process): self.arvrunner.output_callback({}, "permanentFail") else: self.arvrunner.output_callback(outputs, processStatus) - - - - -# --- from cwltool --- - - -CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl" - - -def scandeps( - base: str, - doc: Union[CWLObjectType, MutableSequence[CWLObjectType]], - reffields: Set[str], - urlfields: Set[str], - loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]], - urljoin: Callable[[str, str], str] = urllib.parse.urljoin, - nestdirs: bool = True, - do_normalize: bool = True, -) -> Optional[MutableSequence[CWLObjectType]]: - - """Given a CWL document or input object, search for dependencies - (references to external files) of 'doc' and return them as a list - of File or Directory objects. - - The 'base' is the base URL for relative references. - - Looks for objects with 'class: File' or 'class: Directory' and - adds them to the list of dependencies. - - Anything in 'urlfields' is also added as a File dependency. - - Anything in 'reffields' (such as workflow step 'run') will be - added as a dependency and also loaded (using the 'loadref' - function) and recursively scanned for dependencies. Those - dependencies will be added as secondary files to the primary file. - - If "nestdirs" is true, create intermediate directory objects when - a file is located in a subdirectory under the starting directory. - This is so that if the dependencies are materialized, they will - produce the same relative file system locations. - - """ - - if do_normalize: - import pprint - pprint.pprint(doc) - - r: Optional[MutableSequence[CWLObjectType]] = None - if isinstance(doc, MutableMapping): - if "id" in doc: - if cast(str, doc["id"]).startswith("file://"): - df, _ = urllib.parse.urldefrag(cast(str, doc["id"])) - if base != df: - if r is None: - r = [] - r.append({"class": "File", "location": df, "format": CWL_IANA}) - base = df - - if doc.get("class") in ("File", "Directory") and "location" in urlfields: - with Perf(metrics, "File or Directory with location"): - u = cast(Optional[str], doc.get("location", doc.get("path"))) - if u and not u.startswith("_:"): - deps = { - "class": doc["class"], - "location": urljoin(base, u), - } # type: CWLObjectType - if "basename" in doc: - deps["basename"] = doc["basename"] - if doc["class"] == "Directory" and "listing" in doc: - deps["listing"] = doc["listing"] - if doc["class"] == "File" and "secondaryFiles" in doc: - sd = scandeps( - base, - cast( - Union[CWLObjectType, MutableSequence[CWLObjectType]], - doc["secondaryFiles"], - ), - reffields, - urlfields, - loadref, - urljoin=urljoin, - nestdirs=nestdirs, - do_normalize=False, - ) - if sd: - deps["secondaryFiles"] = cast( - CWLOutputAtomType, - sd - ) - if nestdirs: - deps = nestdir(base, deps) - if r is None: - r = [] - r.append(deps) - else: - if doc["class"] == "Directory" and "listing" in doc: - sd = scandeps( - base, - cast(MutableSequence[CWLObjectType], doc["listing"]), - reffields, - urlfields, - loadref, - urljoin=urljoin, - nestdirs=nestdirs, - do_normalize=False, - ) - if sd: - if r is None: - r = [] - r.extend(sd) - elif doc["class"] == "File" and "secondaryFiles" in doc: - sd = scandeps( - base, - cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]), - reffields, - urlfields, - loadref, - urljoin=urljoin, - nestdirs=nestdirs, - do_normalize=False, - ) - if sd: - if r is None: - r = sd - else: - r.extend(sd) - - for k, v in doc.items(): - if k in reffields: - with Perf(metrics, "k in reffields"): - for u2 in aslist(v): - if isinstance(u2, MutableMapping): - sd = scandeps( - base, - u2, - reffields, - urlfields, - loadref, - urljoin=urljoin, - nestdirs=nestdirs, - do_normalize=False, - ) - if sd: - if r is None: - r = sd - else: - r.extend(sd) - else: - subid = urljoin(base, u2) - basedf, _ = urllib.parse.urldefrag(base) - subiddf, _ = urllib.parse.urldefrag(subid) - if basedf == subiddf: - continue - sub = cast( - Union[MutableSequence[CWLObjectType], CWLObjectType], - loadref(base, u2), - ) - deps2 = { - "class": "File", - "location": subid, - "format": CWL_IANA, - } # type: CWLObjectType - sf = scandeps( - subid, - sub, - reffields, - urlfields, - loadref, - urljoin=urljoin, - nestdirs=nestdirs, - do_normalize=False, - ) - if sf: - deps2["secondaryFiles"] = cast( - MutableSequence[CWLOutputAtomType], mergedirs(sf) - ) - if nestdirs: - deps2 = nestdir(base, deps2) - if r is None: - r = [] - r.append(deps2) - elif k in urlfields and k != "location": - with Perf(metrics, "k in urlfields"): - for u3 in aslist(v): - deps = {"class": "File", "location": urljoin(base, u3)} - if nestdirs: - deps = nestdir(base, deps) - if r is None: - r = [] - r.append(deps) - elif doc.get("class") in ("File", "Directory") and k in ( - "listing", - "secondaryFiles", - ): - # should be handled earlier. - pass - else: - with Perf(metrics, "k is something else"): - sd = scandeps( - base, - cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v), - reffields, - urlfields, - loadref, - urljoin=urljoin, - nestdirs=nestdirs, - do_normalize=False, - ) - if sd: - if r is None: - r = sd - else: - r.extend(sd) - elif isinstance(doc, MutableSequence): - with Perf(metrics, "d in doc"): - for d in doc: - sd = scandeps( - base, - d, - reffields, - urlfields, - loadref, - urljoin=urljoin, - nestdirs=nestdirs, - do_normalize=False, - ) - if r is None: - r = sd - else: - r.extend(sd) - - if r and do_normalize: - normalizeFilesDirs(r) - - return r -- 2.30.2