19385: Work in progress checkpoint, submitting uses wrappers
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index e81f621750c8e2f8c7c310918b8f0725a68df9db..c2c992d4401db5a7531799ae99f7728098acec34 100644 (file)
@@ -51,15 +51,16 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
 from cwltool.builder import Builder
 import schema_salad.validate as validate
+import schema_salad.ref_resolver
 
 import arvados.collection
 import arvados.util
@@ -253,23 +254,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                 if sfname is None:
                     continue
 
-                p_location = primary["location"]
-                if "/" in p_location:
-                    sfpath = (
-                        p_location[0 : p_location.rindex("/") + 1]
-                        + sfname
-                    )
+                if isinstance(sfname, str):
+                    p_location = primary["location"]
+                    if "/" in p_location:
+                        sfpath = (
+                            p_location[0 : p_location.rindex("/") + 1]
+                            + sfname
+                        )
 
             required = builder.do_eval(required, context=primary)
 
-            if fsaccess.exists(sfpath):
-                if pattern is not None:
-                    found.append({"location": sfpath, "class": "File"})
-                else:
-                    found.append(sf)
-            elif required:
-                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
-                    "Required secondary file '%s' does not exist" % sfpath)
+            if isinstance(sfname, list) or isinstance(sfname, dict):
+                each = aslist(sfname)
+                for e in each:
+                    if required and not fsaccess.exists(e.get("location")):
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "Required secondary file '%s' does not exist" % e.get("location"))
+                found.extend(each)
+
+            if isinstance(sfname, str):
+                if fsaccess.exists(sfpath):
+                    if pattern is not None:
+                        found.append({"location": sfpath, "class": "File"})
+                    else:
+                        found.append(sf)
+                elif required:
+                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                        "Required secondary file '%s' does not exist" % sfpath)
 
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
@@ -284,7 +295,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run, runtimeContext,
+                        workflowobj, uri, runtimeContext,
                         include_primary=True, discovered_secondaryfiles=None,
                         cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
@@ -292,61 +303,27 @@ def upload_dependencies(arvrunner, name, document_loader,
     Returns a pathmapper object mapping local paths to keep references.  Also
     does an in-place update of references in "workflowobj".
 
-    Use scandeps to find $import, $include, $schemas, run, File and Directory
+    Use scandeps to find $schemas, File and Directory
     fields that represent external references.
 
     If workflowobj has an "id" field, this will reload the document to ensure
     it is scanning the raw document prior to preprocessing.
     """
 
-    loaded = set()
-    def loadref(b, u):
-        joined = document_loader.fetcher.urljoin(b, u)
-        defrg, _ = urllib.parse.urldefrag(joined)
-        if defrg not in loaded:
-            loaded.add(defrg)
-            if cache is not None and defrg in cache:
-                return cache[defrg]
-            # Use fetch_text to get raw file (before preprocessing).
-            text = document_loader.fetch_text(defrg)
-            if isinstance(text, bytes):
-                textIO = StringIO(text.decode('utf-8'))
-            else:
-                textIO = StringIO(text)
-            yamlloader = YAML(typ='safe', pure=True)
-            result = yamlloader.load(textIO)
-            if cache is not None:
-                cache[defrg] = result
-            return result
-        else:
-            return {}
-
-    if loadref_run:
-        loadref_fields = set(("$import", "run"))
-    else:
-        loadref_fields = set(("$import",))
-
     scanobj = workflowobj
-    if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        # Need raw file content (before preprocessing) to ensure
-        # that external references in $include and $mixin are captured.
-        scanobj = loadref("", workflowobj["id"])
-
     metadata = scanobj
 
-    with Perf(metrics, "scandeps include, location"):
+    with Perf(metrics, "scandeps"):
         sc_result = scandeps(uri, scanobj,
-                             loadref_fields,
-                             set(("$include", "location")),
-                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             set(),
+                             set(("location",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
                              nestdirs=False)
-
-    with Perf(metrics, "scandeps $schemas"):
         optional_deps = scandeps(uri, scanobj,
-                                      loadref_fields,
-                                      set(("$schemas",)),
-                                      loadref, urljoin=document_loader.fetcher.urljoin,
-                                      nestdirs=False)
+                             set(),
+                             set(("$schemas",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
 
     if sc_result is None:
         sc_result = []
@@ -414,8 +391,14 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     normalizeFilesDirs(sc)
 
-    if include_primary and "id" in workflowobj:
-        sc.append({"class": "File", "location": workflowobj["id"]})
+    if "id" in workflowobj:
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if include_primary:
+            # make sure it's included
+            sc.append({"class": "File", "location": defrg})
+        else:
+            # make sure it's excluded
+            sc = [d for d in sc if d.get("location") != defrg]
 
     def visit_default(obj):
         def defaults_are_optional(f):
@@ -469,10 +452,10 @@ def upload_dependencies(arvrunner, name, document_loader,
         if k.startswith("keep:"):
             keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
-    def setloc(p):
+
+    def collectloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
-            p["location"] = mapper.mapper(p["location"]).resolved
             addkeepref(p["location"])
             return
 
@@ -503,12 +486,10 @@ def upload_dependencies(arvrunner, name, document_loader,
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
                 "Collection uuid %s not found" % uuid)
-        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
-        p[collectionUUID] = uuid
 
-    with Perf(metrics, "setloc"):
-        visit_class(workflowobj, ("File", "Directory"), setloc)
-        visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "collectloc"):
+        visit_class(workflowobj, ("File", "Directory"), collectloc)
+        visit_class(discovered, ("File", "Directory"), collectloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
@@ -532,6 +513,7 @@ def upload_dependencies(arvrunner, name, document_loader,
                 logger.warning("Cannot find collection with portable data hash %s", kr)
                 continue
             col = col["items"][0]
+            col["name"] = arvados.util.trim_name(col["name"])
             try:
                 arvrunner.api.collections().create(body={"collection": {
                     "owner_uuid": runtimeContext.project_uuid,
@@ -544,7 +526,7 @@ def upload_dependencies(arvrunner, name, document_loader,
                     "trash_at": col["trash_at"]
                 }}, ensure_unique_name=True).execute()
             except Exception as e:
-                logger.warning("Unable copy collection to destination: %s", e)
+                logger.warning("Unable to copy collection to destination: %s", e)
 
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
@@ -566,26 +548,16 @@ def upload_docker(arvrunner, tool, runtimeContext):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
 
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
-                                                       runtimeContext.project_uuid,
-                                                       runtimeContext.force_docker_pull,
-                                                       runtimeContext.tmp_outdir_prefix,
-                                                       runtimeContext.match_local_docker,
-                                                       runtimeContext.copy_deps)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True,
-                                                       runtimeContext.project_uuid,
-                                                       runtimeContext.force_docker_pull,
-                                                       runtimeContext.tmp_outdir_prefix,
-                                                       runtimeContext.match_local_docker,
-                                                       runtimeContext.copy_deps)
+                                                       True, runtimeContext)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
-def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -625,6 +597,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
             for l in v:
                 visit(l, cur_id)
     visit(packed, None)
+
+    if git_info:
+        for g in git_info:
+            packed[g] = git_info[g]
+
     return packed
 
 
@@ -638,6 +615,45 @@ def tag_git_version(packed):
         else:
             packed["http://schema.org/version"] = githash
 
+def setloc(mapper, p):
+    loc = p.get("location")
+    if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+        p["location"] = mapper.mapper(p["location"]).resolved
+        return
+
+    if not loc:
+        return
+
+    if collectionUUID in p:
+        uuid = p[collectionUUID]
+        if uuid not in uuid_map:
+            raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        gp = collection_pdh_pattern.match(loc)
+        if gp and uuid_map[uuid] != gp.groups()[0]:
+            # This file entry has both collectionUUID and a PDH
+            # location. If the PDH doesn't match the one returned
+            # the API server, raise an error.
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Expected collection uuid %s to be %s but API server reported %s" % (
+                    uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+    gp = collection_uuid_pattern.match(loc)
+    if not gp:
+        # Not a uuid pattern (must be a pdh pattern)
+        return
+
+    uuid = gp.groups()[0]
+    if uuid not in uuid_map:
+        raise SourceLine(p, "location", validate.ValidationException).makeError(
+            "Collection uuid %s not found" % uuid)
+    p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+    p[collectionUUID] = uuid
+
+
+def update_from_mapper(workflowobj, mapper):
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
 
 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
@@ -670,12 +686,14 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
                              tool.tool["inputs"],
                              job_order)
 
+    _jobloaderctx = jobloaderctx.copy()
+    jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
     jobmapper = upload_dependencies(arvrunner,
                                     name,
-                                    tool.doc_loader,
+                                    jobloader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False,
                                     runtimeContext)
 
     if "id" in job_order:
@@ -686,6 +704,8 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     if "job_order" in job_order:
         del job_order["job_order"]
 
+    update_from_mapper(job_order, jobmapper)
+
     return job_order
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
@@ -693,35 +713,45 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    with Perf(metrics, "upload_docker"):
-        upload_docker(arvrunner, tool, runtimeContext)
+    # commented out for testing only, uncomment me
+    #with Perf(metrics, "upload_docker"):
+    #    upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
     merged_map = {}
     tool_dep_cache = {}
+
+    todo = []
+
+    # Standard traversal is top down, we want to go bottom up, so use
+    # the visitor to accumalate a list of nodes to visit, then
+    # visit them in reverse order.
     def upload_tool_deps(deptool):
         if "id" in deptool:
-            discovered_secondaryfiles = {}
-            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
-                pm = upload_dependencies(arvrunner,
-                                         "%s dependencies" % (shortname(deptool["id"])),
-                                         document_loader,
-                                         deptool,
-                                         deptool["id"],
-                                         False,
-                                         runtimeContext,
-                                         include_primary=False,
-                                         discovered_secondaryfiles=discovered_secondaryfiles,
-                                         cache=tool_dep_cache)
-            document_loader.idx[deptool["id"]] = deptool
-            toolmap = {}
-            for k,v in pm.items():
-                toolmap[k] = v.resolved
-            merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+            todo.append(deptool)
 
     tool.visit(upload_tool_deps)
 
+    for deptool in reversed(todo):
+        discovered_secondaryfiles = {}
+        with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+            pm = upload_dependencies(arvrunner,
+                                     "%s dependencies" % (shortname(deptool["id"])),
+                                     document_loader,
+                                     deptool,
+                                     deptool["id"],
+                                     runtimeContext,
+                                     include_primary=False,
+                                     discovered_secondaryfiles=discovered_secondaryfiles,
+                                     cache=tool_dep_cache)
+
+        document_loader.idx[deptool["id"]] = deptool
+        toolmap = {}
+        for k,v in pm.items():
+            toolmap[k] = v.resolved
+        merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
     return merged_map
 
 def arvados_jobs_image(arvrunner, img, runtimeContext):
@@ -729,12 +759,7 @@ def arvados_jobs_image(arvrunner, img, runtimeContext):
 
     try:
         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
-                                                          True,
-                                                          runtimeContext.project_uuid,
-                                                          runtimeContext.force_docker_pull,
-                                                          runtimeContext.tmp_outdir_prefix,
-                                                          runtimeContext.match_local_docker,
-                                                          runtimeContext.copy_deps)
+                                                          True, runtimeContext)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
@@ -775,10 +800,11 @@ class Runner(Process):
                  intermediate_output_ttl=0, merged_map=None,
                  priority=None, secret_store=None,
                  collection_cache_size=256,
-                 collection_cache_is_default=True):
+                 collection_cache_is_default=True,
+                 git_info=None):
 
-        loadingContext = loadingContext.copy()
-        loadingContext.metadata = updated_tool.metadata.copy()
+        self.loadingContext = loadingContext.copy()
+        self.loadingContext.metadata = updated_tool.metadata.copy()
 
         super(Runner, self).__init__(updated_tool.tool, loadingContext)
 
@@ -803,7 +829,9 @@ class Runner(Process):
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
-        self.enable_dev = loadingContext.enable_dev
+        self.enable_dev = self.loadingContext.enable_dev
+        self.git_info = git_info
+        self.fast_parser = self.loadingContext.fast_parser
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
@@ -888,239 +916,3 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
-
-
-
-
-# --- from cwltool ---
-
-
-CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
-
-
-def scandeps(
-    base: str,
-    doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
-    reffields: Set[str],
-    urlfields: Set[str],
-    loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
-    urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
-    nestdirs: bool = True,
-    do_normalize: bool = True,
-) -> Optional[MutableSequence[CWLObjectType]]:
-
-    """Given a CWL document or input object, search for dependencies
-    (references to external files) of 'doc' and return them as a list
-    of File or Directory objects.
-
-    The 'base' is the base URL for relative references.
-
-    Looks for objects with 'class: File' or 'class: Directory' and
-    adds them to the list of dependencies.
-
-    Anything in 'urlfields' is also added as a File dependency.
-
-    Anything in 'reffields' (such as workflow step 'run') will be
-    added as a dependency and also loaded (using the 'loadref'
-    function) and recursively scanned for dependencies.  Those
-    dependencies will be added as secondary files to the primary file.
-
-    If "nestdirs" is true, create intermediate directory objects when
-    a file is located in a subdirectory under the starting directory.
-    This is so that if the dependencies are materialized, they will
-    produce the same relative file system locations.
-
-    """
-
-    if do_normalize:
-        import pprint
-        pprint.pprint(doc)
-
-    r: Optional[MutableSequence[CWLObjectType]] = None
-    if isinstance(doc, MutableMapping):
-        if "id" in doc:
-            if cast(str, doc["id"]).startswith("file://"):
-                df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
-                if base != df:
-                    if r is None:
-                        r = []
-                    r.append({"class": "File", "location": df, "format": CWL_IANA})
-                    base = df
-
-        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
-            with Perf(metrics, "File or Directory with location"):
-                u = cast(Optional[str], doc.get("location", doc.get("path")))
-                if u and not u.startswith("_:"):
-                    deps = {
-                        "class": doc["class"],
-                        "location": urljoin(base, u),
-                    }  # type: CWLObjectType
-                    if "basename" in doc:
-                        deps["basename"] = doc["basename"]
-                    if doc["class"] == "Directory" and "listing" in doc:
-                        deps["listing"] = doc["listing"]
-                    if doc["class"] == "File" and "secondaryFiles" in doc:
-                        sd = scandeps(
-                            base,
-                            cast(
-                                Union[CWLObjectType, MutableSequence[CWLObjectType]],
-                                doc["secondaryFiles"],
-                            ),
-                            reffields,
-                            urlfields,
-                            loadref,
-                            urljoin=urljoin,
-                            nestdirs=nestdirs,
-                            do_normalize=False,
-                        )
-                        if sd:
-                            deps["secondaryFiles"] = cast(
-                                CWLOutputAtomType,
-                                sd
-                            )
-                    if nestdirs:
-                        deps = nestdir(base, deps)
-                    if r is None:
-                        r = []
-                    r.append(deps)
-                else:
-                    if doc["class"] == "Directory" and "listing" in doc:
-                        sd = scandeps(
-                                base,
-                                cast(MutableSequence[CWLObjectType], doc["listing"]),
-                                reffields,
-                                urlfields,
-                                loadref,
-                                urljoin=urljoin,
-                                nestdirs=nestdirs,
-                                do_normalize=False,
-                            )
-                        if sd:
-                            if r is None:
-                                r = []
-                            r.extend(sd)
-                    elif doc["class"] == "File" and "secondaryFiles" in doc:
-                        sd = scandeps(
-                                base,
-                                cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
-                                reffields,
-                                urlfields,
-                                loadref,
-                                urljoin=urljoin,
-                                nestdirs=nestdirs,
-                                do_normalize=False,
-                            )
-                        if sd:
-                            if r is None:
-                                r = sd
-                            else:
-                                r.extend(sd)
-
-        for k, v in doc.items():
-            if k in reffields:
-                with Perf(metrics, "k in reffields"):
-                    for u2 in aslist(v):
-                        if isinstance(u2, MutableMapping):
-                            sd = scandeps(
-                                    base,
-                                    u2,
-                                    reffields,
-                                    urlfields,
-                                    loadref,
-                                    urljoin=urljoin,
-                                    nestdirs=nestdirs,
-                                    do_normalize=False,
-                                )
-                            if sd:
-                                if r is None:
-                                    r = sd
-                                else:
-                                    r.extend(sd)
-                        else:
-                            subid = urljoin(base, u2)
-                            basedf, _ = urllib.parse.urldefrag(base)
-                            subiddf, _ = urllib.parse.urldefrag(subid)
-                            if basedf == subiddf:
-                                continue
-                            sub = cast(
-                                Union[MutableSequence[CWLObjectType], CWLObjectType],
-                                loadref(base, u2),
-                            )
-                            deps2 = {
-                                "class": "File",
-                                "location": subid,
-                                "format": CWL_IANA,
-                            }  # type: CWLObjectType
-                            sf = scandeps(
-                                subid,
-                                sub,
-                                reffields,
-                                urlfields,
-                                loadref,
-                                urljoin=urljoin,
-                                nestdirs=nestdirs,
-                                do_normalize=False,
-                            )
-                            if sf:
-                                deps2["secondaryFiles"] = cast(
-                                    MutableSequence[CWLOutputAtomType], mergedirs(sf)
-                                )
-                            if nestdirs:
-                                deps2 = nestdir(base, deps2)
-                            if r is None:
-                                r = []
-                            r.append(deps2)
-            elif k in urlfields and k != "location":
-                with Perf(metrics, "k in urlfields"):
-                    for u3 in aslist(v):
-                        deps = {"class": "File", "location": urljoin(base, u3)}
-                        if nestdirs:
-                            deps = nestdir(base, deps)
-                        if r is None:
-                            r = []
-                        r.append(deps)
-            elif doc.get("class") in ("File", "Directory") and k in (
-                "listing",
-                "secondaryFiles",
-            ):
-                # should be handled earlier.
-                pass
-            else:
-                with Perf(metrics, "k is something else"):
-                    sd = scandeps(
-                            base,
-                            cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
-                            reffields,
-                            urlfields,
-                            loadref,
-                            urljoin=urljoin,
-                            nestdirs=nestdirs,
-                            do_normalize=False,
-                        )
-                    if sd:
-                        if r is None:
-                            r = sd
-                        else:
-                            r.extend(sd)
-    elif isinstance(doc, MutableSequence):
-        with Perf(metrics, "d in doc"):
-            for d in doc:
-                sd = scandeps(
-                        base,
-                        d,
-                        reffields,
-                        urlfields,
-                        loadref,
-                        urljoin=urljoin,
-                        nestdirs=nestdirs,
-                        do_normalize=False,
-                    )
-                if r is None:
-                    r = sd
-                else:
-                    r.extend(sd)
-
-    if r and do_normalize:
-        normalizeFilesDirs(r)
-
-    return r