import copy
from collections import namedtuple
from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ MutableMapping,
+ Sequence,
+ MutableSequence,
+ Optional,
+ Set,
+ Sized,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+from cwltool.utils import (
+ CWLObjectType,
+ CWLOutputAtomType,
+ CWLOutputType,
+)
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
shortname, Process, fill_in_defaults)
from cwltool.load_tool import fetch_document
from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
metadata = scanobj
- sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps include, location"):
+ sc_result = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$include", "location")),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps $schemas"):
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- sc_result.extend(optional_deps)
+ if sc_result is None:
+ sc_result = []
+
+ if optional_deps is None:
+ optional_deps = []
+
+ if optional_deps:
+ sc_result.extend(optional_deps)
sc = []
uuids = {}
sc.append(obj)
collect_uuids(obj)
- visit_class(workflowobj, ("File", "Directory"), collect_uuids)
- visit_class(sc_result, ("File", "Directory"), collect_uploads)
+ with Perf(metrics, "collect uuids"):
+ visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+ with Perf(metrics, "collect uploads"):
+ visit_class(sc_result, ("File", "Directory"), collect_uploads)
# Resolve any collection uuids we found to portable data hashes
# and assign them to uuid_map
uuid_map = {}
fetch_uuids = list(uuids.keys())
- while fetch_uuids:
- # For a large number of fetch_uuids, API server may limit
- # response size, so keep fetching from API server has nothing
- # more to give us.
- lookups = arvrunner.api.collections().list(
- filters=[["uuid", "in", fetch_uuids]],
- count="none",
- select=["uuid", "portable_data_hash"]).execute(
- num_retries=arvrunner.num_retries)
+ with Perf(metrics, "fetch_uuids"):
+ while fetch_uuids:
+ # For a large number of fetch_uuids, API server may limit
+ # response size, so keep fetching from API server has nothing
+ # more to give us.
+ lookups = arvrunner.api.collections().list(
+ filters=[["uuid", "in", fetch_uuids]],
+ count="none",
+ select=["uuid", "portable_data_hash"]).execute(
+ num_retries=arvrunner.num_retries)
- if not lookups["items"]:
- break
+ if not lookups["items"]:
+ break
- for l in lookups["items"]:
- uuid_map[l["uuid"]] = l["portable_data_hash"]
+ for l in lookups["items"]:
+ uuid_map[l["uuid"]] = l["portable_data_hash"]
- fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+ fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
normalizeFilesDirs(sc)
else:
del discovered[d]
- mapper = ArvPathMapper(arvrunner, sc, "",
- "keep:%s",
- "keep:%s/%s",
- name=name,
- single_collection=True,
- optional_deps=optional_deps)
+ with Perf(metrics, "mapper"):
+ mapper = ArvPathMapper(arvrunner, sc, "",
+ "keep:%s",
+ "keep:%s/%s",
+ name=name,
+ single_collection=True,
+ optional_deps=optional_deps)
keeprefs = set()
def addkeepref(k):
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), setloc)
+ visit_class(discovered, ("File", "Directory"), setloc)
if discovered_secondaryfiles is not None:
for d in discovered:
def upload_tool_deps(deptool):
if "id" in deptool:
discovered_secondaryfiles = {}
- with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"]):
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
pm = upload_dependencies(arvrunner,
"%s dependencies" % (shortname(deptool["id"])),
document_loader,
self.arvrunner.output_callback({}, "permanentFail")
else:
self.arvrunner.output_callback(outputs, processStatus)
+
+
+
+
+# --- from cwltool ---
+
+
+CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
+
+
+def scandeps(
+ base: str,
+ doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
+ reffields: Set[str],
+ urlfields: Set[str],
+ loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
+ urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
+ nestdirs: bool = True,
+ do_normalize: bool = True,
+) -> Optional[MutableSequence[CWLObjectType]]:
+
+ """Given a CWL document or input object, search for dependencies
+ (references to external files) of 'doc' and return them as a list
+ of File or Directory objects.
+
+ The 'base' is the base URL for relative references.
+
+ Looks for objects with 'class: File' or 'class: Directory' and
+ adds them to the list of dependencies.
+
+ Anything in 'urlfields' is also added as a File dependency.
+
+ Anything in 'reffields' (such as workflow step 'run') will be
+ added as a dependency and also loaded (using the 'loadref'
+ function) and recursively scanned for dependencies. Those
+ dependencies will be added as secondary files to the primary file.
+
+ If "nestdirs" is true, create intermediate directory objects when
+ a file is located in a subdirectory under the starting directory.
+ This is so that if the dependencies are materialized, they will
+ produce the same relative file system locations.
+
+ """
+
+ if do_normalize:
+ import pprint
+ pprint.pprint(doc)
+
+ r: Optional[MutableSequence[CWLObjectType]] = None
+ if isinstance(doc, MutableMapping):
+ if "id" in doc:
+ if cast(str, doc["id"]).startswith("file://"):
+ df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
+ if base != df:
+ if r is None:
+ r = []
+ r.append({"class": "File", "location": df, "format": CWL_IANA})
+ base = df
+
+ if doc.get("class") in ("File", "Directory") and "location" in urlfields:
+ with Perf(metrics, "File or Directory with location"):
+ u = cast(Optional[str], doc.get("location", doc.get("path")))
+ if u and not u.startswith("_:"):
+ deps = {
+ "class": doc["class"],
+ "location": urljoin(base, u),
+ } # type: CWLObjectType
+ if "basename" in doc:
+ deps["basename"] = doc["basename"]
+ if doc["class"] == "Directory" and "listing" in doc:
+ deps["listing"] = doc["listing"]
+ if doc["class"] == "File" and "secondaryFiles" in doc:
+ sd = scandeps(
+ base,
+ cast(
+ Union[CWLObjectType, MutableSequence[CWLObjectType]],
+ doc["secondaryFiles"],
+ ),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ deps["secondaryFiles"] = cast(
+ CWLOutputAtomType,
+ sd
+ )
+ if nestdirs:
+ deps = nestdir(base, deps)
+ if r is None:
+ r = []
+ r.append(deps)
+ else:
+ if doc["class"] == "Directory" and "listing" in doc:
+ sd = scandeps(
+ base,
+ cast(MutableSequence[CWLObjectType], doc["listing"]),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = []
+ r.extend(sd)
+ elif doc["class"] == "File" and "secondaryFiles" in doc:
+ sd = scandeps(
+ base,
+ cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+
+ for k, v in doc.items():
+ if k in reffields:
+ with Perf(metrics, "k in reffields"):
+ for u2 in aslist(v):
+ if isinstance(u2, MutableMapping):
+ sd = scandeps(
+ base,
+ u2,
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+ else:
+ subid = urljoin(base, u2)
+ basedf, _ = urllib.parse.urldefrag(base)
+ subiddf, _ = urllib.parse.urldefrag(subid)
+ if basedf == subiddf:
+ continue
+ sub = cast(
+ Union[MutableSequence[CWLObjectType], CWLObjectType],
+ loadref(base, u2),
+ )
+ deps2 = {
+ "class": "File",
+ "location": subid,
+ "format": CWL_IANA,
+ } # type: CWLObjectType
+ sf = scandeps(
+ subid,
+ sub,
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sf:
+ deps2["secondaryFiles"] = cast(
+ MutableSequence[CWLOutputAtomType], mergedirs(sf)
+ )
+ if nestdirs:
+ deps2 = nestdir(base, deps2)
+ if r is None:
+ r = []
+ r.append(deps2)
+ elif k in urlfields and k != "location":
+ with Perf(metrics, "k in urlfields"):
+ for u3 in aslist(v):
+ deps = {"class": "File", "location": urljoin(base, u3)}
+ if nestdirs:
+ deps = nestdir(base, deps)
+ if r is None:
+ r = []
+ r.append(deps)
+ elif doc.get("class") in ("File", "Directory") and k in (
+ "listing",
+ "secondaryFiles",
+ ):
+ # should be handled earlier.
+ pass
+ else:
+ with Perf(metrics, "k is something else"):
+ sd = scandeps(
+ base,
+ cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+ elif isinstance(doc, MutableSequence):
+ with Perf(metrics, "d in doc"):
+ for d in doc:
+ sd = scandeps(
+ base,
+ d,
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+
+ if r and do_normalize:
+ normalizeFilesDirs(r)
+
+ return r