import copy
from collections import namedtuple
from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ MutableMapping,
+ Sequence,
+ MutableSequence,
+ Optional,
+ Set,
+ Sized,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+from cwltool.utils import (
+ CWLObjectType,
+ CWLOutputAtomType,
+ CWLOutputType,
+)
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
shortname, Process, fill_in_defaults)
from cwltool.load_tool import fetch_document
from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
import schema_salad.validate as validate
import arvados.collection
+import arvados.util
from .util import collectionUUID
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
+from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
def trim_anonymous_location(obj):
"""Remove 'location' field from File and Directory literals.
set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
return
+ if inputschema == "File":
+ inputschema = {"type": "File"}
+
if isinstance(inputschema, basestring):
sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
if sd:
set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
elif (inputschema["type"] == "File" and
- secondaryspec and
isinstance(primary, Mapping) and
- primary.get("class") == "File" and
- "secondaryFiles" not in primary):
+ primary.get("class") == "File"):
+
+ if "secondaryFiles" in primary or not secondaryspec:
+ # Nothing to do.
+ return
+
#
# Found a file, check for secondaryFiles
#
primary["secondaryFiles"] = secondaryspec
for i, sf in enumerate(aslist(secondaryspec)):
if builder.cwlVersion == "v1.0":
- pattern = builder.do_eval(sf, context=primary)
+ pattern = sf
else:
- pattern = builder.do_eval(sf["pattern"], context=primary)
+ pattern = sf["pattern"]
if pattern is None:
continue
if isinstance(pattern, list):
"Expression must return list, object, string or null")
if pattern is not None:
- sfpath = substitute(primary["location"], pattern)
+ if "${" in pattern or "$(" in pattern:
+ sfname = builder.do_eval(pattern, context=primary)
+ else:
+ sfname = substitute(primary["basename"], pattern)
+
+ if sfname is None:
+ continue
+
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
primary["secondaryFiles"] = cmap(found)
if discovered is not None:
discovered[primary["location"]] = primary["secondaryFiles"]
- elif inputschema["type"] not in primitive_types_set:
+ elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run,
- include_primary=True, discovered_secondaryfiles=None):
+ workflowobj, uri, loadref_run, runtimeContext,
+ include_primary=True, discovered_secondaryfiles=None,
+ cache=None):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
defrg, _ = urllib.parse.urldefrag(joined)
if defrg not in loaded:
loaded.add(defrg)
+ if cache is not None and defrg in cache:
+ return cache[defrg]
# Use fetch_text to get raw file (before preprocessing).
text = document_loader.fetch_text(defrg)
if isinstance(text, bytes):
else:
textIO = StringIO(text)
yamlloader = YAML(typ='safe', pure=True)
- return yamlloader.load(textIO)
+ result = yamlloader.load(textIO)
+ if cache is not None:
+ cache[defrg] = result
+ return result
else:
return {}
metadata = scanobj
- sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps include, location"):
+ sc_result = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$include", "location")),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ with Perf(metrics, "scandeps $schemas"):
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ if sc_result is None:
+ sc_result = []
- optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ if optional_deps is None:
+ optional_deps = []
- sc_result.extend(optional_deps)
+ if optional_deps:
+ sc_result.extend(optional_deps)
sc = []
uuids = {}
sc.append(obj)
collect_uuids(obj)
- visit_class(workflowobj, ("File", "Directory"), collect_uuids)
- visit_class(sc_result, ("File", "Directory"), collect_uploads)
+ with Perf(metrics, "collect uuids"):
+ visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+ with Perf(metrics, "collect uploads"):
+ visit_class(sc_result, ("File", "Directory"), collect_uploads)
# Resolve any collection uuids we found to portable data hashes
# and assign them to uuid_map
uuid_map = {}
fetch_uuids = list(uuids.keys())
- while fetch_uuids:
- # For a large number of fetch_uuids, API server may limit
- # response size, so keep fetching from API server has nothing
- # more to give us.
- lookups = arvrunner.api.collections().list(
- filters=[["uuid", "in", fetch_uuids]],
- count="none",
- select=["uuid", "portable_data_hash"]).execute(
- num_retries=arvrunner.num_retries)
+ with Perf(metrics, "fetch_uuids"):
+ while fetch_uuids:
+ # For a large number of fetch_uuids, API server may limit
+ # response size, so keep fetching from API server has nothing
+ # more to give us.
+ lookups = arvrunner.api.collections().list(
+ filters=[["uuid", "in", fetch_uuids]],
+ count="none",
+ select=["uuid", "portable_data_hash"]).execute(
+ num_retries=arvrunner.num_retries)
- if not lookups["items"]:
- break
+ if not lookups["items"]:
+ break
- for l in lookups["items"]:
- uuid_map[l["uuid"]] = l["portable_data_hash"]
+ for l in lookups["items"]:
+ uuid_map[l["uuid"]] = l["portable_data_hash"]
- fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+ fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
normalizeFilesDirs(sc)
if "location" not in f and "path" in f:
f["location"] = f["path"]
del f["path"]
+ normalizeFilesDirs(f)
optional_deps.append(f)
visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
else:
del discovered[d]
- mapper = ArvPathMapper(arvrunner, sc, "",
- "keep:%s",
- "keep:%s/%s",
- name=name,
- single_collection=True,
- optional_deps=optional_deps)
+ with Perf(metrics, "mapper"):
+ mapper = ArvPathMapper(arvrunner, sc, "",
+ "keep:%s",
+ "keep:%s/%s",
+ name=name,
+ single_collection=True,
+ optional_deps=optional_deps)
+
+ keeprefs = set()
+ def addkeepref(k):
+ if k.startswith("keep:"):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
def setloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ addkeepref(p["location"])
return
if not loc:
gp = collection_uuid_pattern.match(loc)
if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ addkeepref(p["location"])
return
+
uuid = gp.groups()[0]
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), setloc)
+ visit_class(discovered, ("File", "Directory"), setloc)
if discovered_secondaryfiles is not None:
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
+ if runtimeContext.copy_deps:
+ # Find referenced collections and copy them into the
+ # destination project, for easy sharing.
+ already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+ filters=[["portable_data_hash", "in", list(keeprefs)],
+ ["owner_uuid", "=", runtimeContext.project_uuid]],
+ select=["uuid", "portable_data_hash", "created_at"]))
+
+ keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+ for kr in keeprefs:
+ col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+ order="created_at desc",
+ select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+ limit=1).execute()
+ if len(col["items"]) == 0:
+ logger.warning("Cannot find collection with portable data hash %s", kr)
+ continue
+ col = col["items"][0]
+ try:
+ arvrunner.api.collections().create(body={"collection": {
+ "owner_uuid": runtimeContext.project_uuid,
+ "name": col["name"],
+ "description": col["description"],
+ "properties": col["properties"],
+ "portable_data_hash": col["portable_data_hash"],
+ "manifest_text": col["manifest_text"],
+ "storage_classes_desired": col["storage_classes_desired"],
+ "trash_at": col["trash_at"]
+ }}, ensure_unique_name=True).execute()
+ except Exception as e:
+ logger.warning("Unable copy collection to destination: %s", e)
+
if "$schemas" in workflowobj:
sch = CommentedSeq()
for s in workflowobj["$schemas"]:
return mapper
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
- # TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
+ upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
packed["http://schema.org/version"] = githash
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
"""
tool.doc_loader,
job_order,
job_order.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if "id" in job_order:
del job_order["id"]
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool)
+ with Perf(metrics, "upload_docker"):
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
merged_map = {}
-
+ tool_dep_cache = {}
def upload_tool_deps(deptool):
if "id" in deptool:
discovered_secondaryfiles = {}
- pm = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- include_primary=False,
- discovered_secondaryfiles=discovered_secondaryfiles)
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ runtimeContext,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles,
+ cache=tool_dep_cache)
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
return merged_map
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
collection = arvados.collection.Collection(api_client=arvrunner.api,
keep_client=arvrunner.keep_client,
num_retries=arvrunner.num_retries)
filters = [["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", name+"%"]]
- if arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ if runtimeContext.project_uuid:
+ filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
else:
collection.save_new(name=name,
- owner_uuid=arvrunner.project_uuid,
+ owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
num_retries=arvrunner.num_retries)
logger.info("Uploaded to %s", collection.manifest_locator())
self.arvrunner.output_callback({}, "permanentFail")
else:
self.arvrunner.output_callback(outputs, processStatus)
+
+
+
+
+# --- from cwltool ---
+
+
+CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
+
+
+def scandeps(
+ base: str,
+ doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
+ reffields: Set[str],
+ urlfields: Set[str],
+ loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
+ urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
+ nestdirs: bool = True,
+ do_normalize: bool = True,
+) -> Optional[MutableSequence[CWLObjectType]]:
+
+ """Given a CWL document or input object, search for dependencies
+ (references to external files) of 'doc' and return them as a list
+ of File or Directory objects.
+
+ The 'base' is the base URL for relative references.
+
+ Looks for objects with 'class: File' or 'class: Directory' and
+ adds them to the list of dependencies.
+
+ Anything in 'urlfields' is also added as a File dependency.
+
+ Anything in 'reffields' (such as workflow step 'run') will be
+ added as a dependency and also loaded (using the 'loadref'
+ function) and recursively scanned for dependencies. Those
+ dependencies will be added as secondary files to the primary file.
+
+ If "nestdirs" is true, create intermediate directory objects when
+ a file is located in a subdirectory under the starting directory.
+ This is so that if the dependencies are materialized, they will
+ produce the same relative file system locations.
+
+ """
+
+ if do_normalize:
+ import pprint
+ pprint.pprint(doc)
+
+ r: Optional[MutableSequence[CWLObjectType]] = None
+ if isinstance(doc, MutableMapping):
+ if "id" in doc:
+ if cast(str, doc["id"]).startswith("file://"):
+ df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
+ if base != df:
+ if r is None:
+ r = []
+ r.append({"class": "File", "location": df, "format": CWL_IANA})
+ base = df
+
+ if doc.get("class") in ("File", "Directory") and "location" in urlfields:
+ with Perf(metrics, "File or Directory with location"):
+ u = cast(Optional[str], doc.get("location", doc.get("path")))
+ if u and not u.startswith("_:"):
+ deps = {
+ "class": doc["class"],
+ "location": urljoin(base, u),
+ } # type: CWLObjectType
+ if "basename" in doc:
+ deps["basename"] = doc["basename"]
+ if doc["class"] == "Directory" and "listing" in doc:
+ deps["listing"] = doc["listing"]
+ if doc["class"] == "File" and "secondaryFiles" in doc:
+ sd = scandeps(
+ base,
+ cast(
+ Union[CWLObjectType, MutableSequence[CWLObjectType]],
+ doc["secondaryFiles"],
+ ),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ deps["secondaryFiles"] = cast(
+ CWLOutputAtomType,
+ sd
+ )
+ if nestdirs:
+ deps = nestdir(base, deps)
+ if r is None:
+ r = []
+ r.append(deps)
+ else:
+ if doc["class"] == "Directory" and "listing" in doc:
+ sd = scandeps(
+ base,
+ cast(MutableSequence[CWLObjectType], doc["listing"]),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = []
+ r.extend(sd)
+ elif doc["class"] == "File" and "secondaryFiles" in doc:
+ sd = scandeps(
+ base,
+ cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+
+ for k, v in doc.items():
+ if k in reffields:
+ with Perf(metrics, "k in reffields"):
+ for u2 in aslist(v):
+ if isinstance(u2, MutableMapping):
+ sd = scandeps(
+ base,
+ u2,
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+ else:
+ subid = urljoin(base, u2)
+ basedf, _ = urllib.parse.urldefrag(base)
+ subiddf, _ = urllib.parse.urldefrag(subid)
+ if basedf == subiddf:
+ continue
+ sub = cast(
+ Union[MutableSequence[CWLObjectType], CWLObjectType],
+ loadref(base, u2),
+ )
+ deps2 = {
+ "class": "File",
+ "location": subid,
+ "format": CWL_IANA,
+ } # type: CWLObjectType
+ sf = scandeps(
+ subid,
+ sub,
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sf:
+ deps2["secondaryFiles"] = cast(
+ MutableSequence[CWLOutputAtomType], mergedirs(sf)
+ )
+ if nestdirs:
+ deps2 = nestdir(base, deps2)
+ if r is None:
+ r = []
+ r.append(deps2)
+ elif k in urlfields and k != "location":
+ with Perf(metrics, "k in urlfields"):
+ for u3 in aslist(v):
+ deps = {"class": "File", "location": urljoin(base, u3)}
+ if nestdirs:
+ deps = nestdir(base, deps)
+ if r is None:
+ r = []
+ r.append(deps)
+ elif doc.get("class") in ("File", "Directory") and k in (
+ "listing",
+ "secondaryFiles",
+ ):
+ # should be handled earlier.
+ pass
+ else:
+ with Perf(metrics, "k is something else"):
+ sd = scandeps(
+ base,
+ cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if sd:
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+ elif isinstance(doc, MutableSequence):
+ with Perf(metrics, "d in doc"):
+ for d in doc:
+ sd = scandeps(
+ base,
+ d,
+ reffields,
+ urlfields,
+ loadref,
+ urljoin=urljoin,
+ nestdirs=nestdirs,
+ do_normalize=False,
+ )
+ if r is None:
+ r = sd
+ else:
+ r.extend(sd)
+
+ if r and do_normalize:
+ normalizeFilesDirs(r)
+
+ return r