19280: what are we scanning here actually
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index e5a81cdc73fe77238b743705c1dd4c8847240d18..e81f621750c8e2f8c7c310918b8f0725a68df9db 100644 (file)
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -28,7 +51,7 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
@@ -39,6 +62,7 @@ from cwltool.builder import Builder
 import schema_salad.validate as validate
 
 import arvados.collection
+import arvados.util
 from .util import collectionUUID
 from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
@@ -48,8 +72,10 @@ from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, col
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
+from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def trim_anonymous_location(obj):
     """Remove 'location' field from File and Directory literals.
@@ -128,6 +154,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
         return
 
+    if inputschema == "File":
+        inputschema = {"type": "File"}
+
     if isinstance(inputschema, basestring):
         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
         if sd:
@@ -163,10 +192,13 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
 
     elif (inputschema["type"] == "File" and
-          secondaryspec and
           isinstance(primary, Mapping) and
-          primary.get("class") == "File" and
-          "secondaryFiles" not in primary):
+          primary.get("class") == "File"):
+
+        if "secondaryFiles" in primary or not secondaryspec:
+            # Nothing to do.
+            return
+
         #
         # Found a file, check for secondaryFiles
         #
@@ -174,9 +206,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
             if builder.cwlVersion == "v1.0":
-                pattern = builder.do_eval(sf, context=primary)
+                pattern = sf
             else:
-                pattern = builder.do_eval(sf["pattern"], context=primary)
+                pattern = sf["pattern"]
             if pattern is None:
                 continue
             if isinstance(pattern, list):
@@ -213,7 +245,20 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                     "Expression must return list, object, string or null")
 
             if pattern is not None:
-                sfpath = substitute(primary["location"], pattern)
+                if "${" in pattern or "$(" in pattern:
+                    sfname = builder.do_eval(pattern, context=primary)
+                else:
+                    sfname = substitute(primary["basename"], pattern)
+
+                if sfname is None:
+                    continue
+
+                p_location = primary["location"]
+                if "/" in p_location:
+                    sfpath = (
+                        p_location[0 : p_location.rindex("/") + 1]
+                        + sfname
+                    )
 
             required = builder.do_eval(required, context=primary)
 
@@ -229,7 +274,7 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
-    elif inputschema["type"] not in primitive_types_set:
+    elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
 
 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
@@ -239,8 +284,9 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run,
-                        include_primary=True, discovered_secondaryfiles=None):
+                        workflowobj, uri, loadref_run, runtimeContext,
+                        include_primary=True, discovered_secondaryfiles=None,
+                        cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
     Returns a pathmapper object mapping local paths to keep references.  Also
@@ -259,6 +305,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
+            if cache is not None and defrg in cache:
+                return cache[defrg]
             # Use fetch_text to get raw file (before preprocessing).
             text = document_loader.fetch_text(defrg)
             if isinstance(text, bytes):
@@ -266,7 +314,10 @@ def upload_dependencies(arvrunner, name, document_loader,
             else:
                 textIO = StringIO(text)
             yamlloader = YAML(typ='safe', pure=True)
-            return yamlloader.load(textIO)
+            result = yamlloader.load(textIO)
+            if cache is not None:
+                cache[defrg] = result
+            return result
         else:
             return {}
 
@@ -283,19 +334,28 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     metadata = scanobj
 
-    sc_result = scandeps(uri, scanobj,
-                         loadref_fields,
-                         set(("$include", "location")),
-                         loadref, urljoin=document_loader.fetcher.urljoin,
-                         nestdirs=False)
+    with Perf(metrics, "scandeps include, location"):
+        sc_result = scandeps(uri, scanobj,
+                             loadref_fields,
+                             set(("$include", "location")),
+                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
+
+    with Perf(metrics, "scandeps $schemas"):
+        optional_deps = scandeps(uri, scanobj,
+                                      loadref_fields,
+                                      set(("$schemas",)),
+                                      loadref, urljoin=document_loader.fetcher.urljoin,
+                                      nestdirs=False)
+
+    if sc_result is None:
+        sc_result = []
 
-    optional_deps = scandeps(uri, scanobj,
-                                  loadref_fields,
-                                  set(("$schemas",)),
-                                  loadref, urljoin=document_loader.fetcher.urljoin,
-                                  nestdirs=False)
+    if optional_deps is None:
+        optional_deps = []
 
-    sc_result.extend(optional_deps)
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -323,30 +383,34 @@ def upload_dependencies(arvrunner, name, document_loader,
             sc.append(obj)
         collect_uuids(obj)
 
-    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
-    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+    with Perf(metrics, "collect uuids"):
+        visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+    with Perf(metrics, "collect uploads"):
+        visit_class(sc_result, ("File", "Directory"), collect_uploads)
 
     # Resolve any collection uuids we found to portable data hashes
     # and assign them to uuid_map
     uuid_map = {}
     fetch_uuids = list(uuids.keys())
-    while fetch_uuids:
-        # For a large number of fetch_uuids, API server may limit
-        # response size, so keep fetching from API server has nothing
-        # more to give us.
-        lookups = arvrunner.api.collections().list(
-            filters=[["uuid", "in", fetch_uuids]],
-            count="none",
-            select=["uuid", "portable_data_hash"]).execute(
-                num_retries=arvrunner.num_retries)
+    with Perf(metrics, "fetch_uuids"):
+        while fetch_uuids:
+            # For a large number of fetch_uuids, API server may limit
+            # response size, so keep fetching from API server has nothing
+            # more to give us.
+            lookups = arvrunner.api.collections().list(
+                filters=[["uuid", "in", fetch_uuids]],
+                count="none",
+                select=["uuid", "portable_data_hash"]).execute(
+                    num_retries=arvrunner.num_retries)
 
-        if not lookups["items"]:
-            break
+            if not lookups["items"]:
+                break
 
-        for l in lookups["items"]:
-            uuid_map[l["uuid"]] = l["portable_data_hash"]
+            for l in lookups["items"]:
+                uuid_map[l["uuid"]] = l["portable_data_hash"]
 
-        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+            fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
@@ -358,6 +422,7 @@ def upload_dependencies(arvrunner, name, document_loader,
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
+            normalizeFilesDirs(f)
             optional_deps.append(f)
         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
 
@@ -391,17 +456,24 @@ def upload_dependencies(arvrunner, name, document_loader,
         else:
             del discovered[d]
 
-    mapper = ArvPathMapper(arvrunner, sc, "",
-                           "keep:%s",
-                           "keep:%s/%s",
-                           name=name,
-                           single_collection=True,
-                           optional_deps=optional_deps)
+    with Perf(metrics, "mapper"):
+        mapper = ArvPathMapper(arvrunner, sc, "",
+                               "keep:%s",
+                               "keep:%s/%s",
+                               name=name,
+                               single_collection=True,
+                               optional_deps=optional_deps)
+
+    keeprefs = set()
+    def addkeepref(k):
+        if k.startswith("keep:"):
+            keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
     def setloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            addkeepref(p["location"])
             return
 
         if not loc:
@@ -423,7 +495,10 @@ def upload_dependencies(arvrunner, name, document_loader,
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
+            # Not a uuid pattern (must be a pdh pattern)
+            addkeepref(p["location"])
             return
+
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
@@ -431,13 +506,46 @@ def upload_dependencies(arvrunner, name, document_loader,
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
-    visit_class(workflowobj, ("File", "Directory"), setloc)
-    visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), setloc)
+        visit_class(discovered, ("File", "Directory"), setloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
+    if runtimeContext.copy_deps:
+        # Find referenced collections and copy them into the
+        # destination project, for easy sharing.
+        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+                                     filters=[["portable_data_hash", "in", list(keeprefs)],
+                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
+                                     select=["uuid", "portable_data_hash", "created_at"]))
+
+        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+        for kr in keeprefs:
+            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+                                                  order="created_at desc",
+                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+                                                   limit=1).execute()
+            if len(col["items"]) == 0:
+                logger.warning("Cannot find collection with portable data hash %s", kr)
+                continue
+            col = col["items"][0]
+            try:
+                arvrunner.api.collections().create(body={"collection": {
+                    "owner_uuid": runtimeContext.project_uuid,
+                    "name": col["name"],
+                    "description": col["description"],
+                    "properties": col["properties"],
+                    "portable_data_hash": col["portable_data_hash"],
+                    "manifest_text": col["manifest_text"],
+                    "storage_classes_desired": col["storage_classes_desired"],
+                    "trash_at": col["trash_at"]
+                }}, ensure_unique_name=True).execute()
+            except Exception as e:
+                logger.warning("Unable copy collection to destination: %s", e)
+
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
@@ -448,32 +556,36 @@ def upload_dependencies(arvrunner, name, document_loader,
     return mapper
 
 
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
-                # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker)
+
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker)
+                                                       True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
+            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -502,10 +614,11 @@ def packed_workflow(arvrunner, tool, merged_map):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             arvrunner.project_uuid,
-                                                                                                             arvrunner.runtimeContext.force_docker_pull,
-                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                                                                             arvrunner.runtimeContext.match_local_docker)
+                                                                                                             runtimeContext.project_uuid,
+                                                                                                             runtimeContext.force_docker_pull,
+                                                                                                             runtimeContext.tmp_outdir_prefix,
+                                                                                                             runtimeContext.match_local_docker,
+                                                                                                             runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -526,7 +639,7 @@ def tag_git_version(packed):
             packed["http://schema.org/version"] = githash
 
 
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
@@ -562,7 +675,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False)
+                                    False,
+                                    runtimeContext)
 
     if "id" in job_order:
         del job_order["id"]
@@ -576,26 +690,30 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool)
+    with Perf(metrics, "upload_docker"):
+        upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
     merged_map = {}
-
+    tool_dep_cache = {}
     def upload_tool_deps(deptool):
         if "id" in deptool:
             discovered_secondaryfiles = {}
-            pm = upload_dependencies(arvrunner,
-                                     "%s dependencies" % (shortname(deptool["id"])),
-                                     document_loader,
-                                     deptool,
-                                     deptool["id"],
-                                     False,
-                                     include_primary=False,
-                                     discovered_secondaryfiles=discovered_secondaryfiles)
+            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+                pm = upload_dependencies(arvrunner,
+                                         "%s dependencies" % (shortname(deptool["id"])),
+                                         document_loader,
+                                         deptool,
+                                         deptool["id"],
+                                         False,
+                                         runtimeContext,
+                                         include_primary=False,
+                                         discovered_secondaryfiles=discovered_secondaryfiles,
+                                         cache=tool_dep_cache)
             document_loader.idx[deptool["id"]] = deptool
             toolmap = {}
             for k,v in pm.items():
@@ -606,19 +724,22 @@ def upload_workflow_deps(arvrunner, tool):
 
     return merged_map
 
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
-                                                          arvrunner.runtimeContext.force_docker_pull,
-                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                          arvrunner.runtimeContext.match_local_docker)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+                                                          True,
+                                                          runtimeContext.project_uuid,
+                                                          runtimeContext.force_docker_pull,
+                                                          runtimeContext.tmp_outdir_prefix,
+                                                          runtimeContext.match_local_docker,
+                                                          runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
@@ -627,15 +748,15 @@ def upload_workflow_collection(arvrunner, name, packed):
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
-    if arvrunner.project_uuid:
-        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    if runtimeContext.project_uuid:
+        filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
-                            owner_uuid=arvrunner.project_uuid,
+                            owner_uuid=runtimeContext.project_uuid,
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
@@ -767,3 +888,239 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+
+
+# --- from cwltool ---
+
+
+CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
+
+
+def scandeps(
+    base: str,
+    doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
+    reffields: Set[str],
+    urlfields: Set[str],
+    loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
+    urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
+    nestdirs: bool = True,
+    do_normalize: bool = True,
+) -> Optional[MutableSequence[CWLObjectType]]:
+
+    """Given a CWL document or input object, search for dependencies
+    (references to external files) of 'doc' and return them as a list
+    of File or Directory objects.
+
+    The 'base' is the base URL for relative references.
+
+    Looks for objects with 'class: File' or 'class: Directory' and
+    adds them to the list of dependencies.
+
+    Anything in 'urlfields' is also added as a File dependency.
+
+    Anything in 'reffields' (such as workflow step 'run') will be
+    added as a dependency and also loaded (using the 'loadref'
+    function) and recursively scanned for dependencies.  Those
+    dependencies will be added as secondary files to the primary file.
+
+    If "nestdirs" is true, create intermediate directory objects when
+    a file is located in a subdirectory under the starting directory.
+    This is so that if the dependencies are materialized, they will
+    produce the same relative file system locations.
+
+    """
+
+    if do_normalize:
+        import pprint
+        pprint.pprint(doc)
+
+    r: Optional[MutableSequence[CWLObjectType]] = None
+    if isinstance(doc, MutableMapping):
+        if "id" in doc:
+            if cast(str, doc["id"]).startswith("file://"):
+                df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
+                if base != df:
+                    if r is None:
+                        r = []
+                    r.append({"class": "File", "location": df, "format": CWL_IANA})
+                    base = df
+
+        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
+            with Perf(metrics, "File or Directory with location"):
+                u = cast(Optional[str], doc.get("location", doc.get("path")))
+                if u and not u.startswith("_:"):
+                    deps = {
+                        "class": doc["class"],
+                        "location": urljoin(base, u),
+                    }  # type: CWLObjectType
+                    if "basename" in doc:
+                        deps["basename"] = doc["basename"]
+                    if doc["class"] == "Directory" and "listing" in doc:
+                        deps["listing"] = doc["listing"]
+                    if doc["class"] == "File" and "secondaryFiles" in doc:
+                        sd = scandeps(
+                            base,
+                            cast(
+                                Union[CWLObjectType, MutableSequence[CWLObjectType]],
+                                doc["secondaryFiles"],
+                            ),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                        if sd:
+                            deps["secondaryFiles"] = cast(
+                                CWLOutputAtomType,
+                                sd
+                            )
+                    if nestdirs:
+                        deps = nestdir(base, deps)
+                    if r is None:
+                        r = []
+                    r.append(deps)
+                else:
+                    if doc["class"] == "Directory" and "listing" in doc:
+                        sd = scandeps(
+                                base,
+                                cast(MutableSequence[CWLObjectType], doc["listing"]),
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                        if sd:
+                            if r is None:
+                                r = []
+                            r.extend(sd)
+                    elif doc["class"] == "File" and "secondaryFiles" in doc:
+                        sd = scandeps(
+                                base,
+                                cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                        if sd:
+                            if r is None:
+                                r = sd
+                            else:
+                                r.extend(sd)
+
+        for k, v in doc.items():
+            if k in reffields:
+                with Perf(metrics, "k in reffields"):
+                    for u2 in aslist(v):
+                        if isinstance(u2, MutableMapping):
+                            sd = scandeps(
+                                    base,
+                                    u2,
+                                    reffields,
+                                    urlfields,
+                                    loadref,
+                                    urljoin=urljoin,
+                                    nestdirs=nestdirs,
+                                    do_normalize=False,
+                                )
+                            if sd:
+                                if r is None:
+                                    r = sd
+                                else:
+                                    r.extend(sd)
+                        else:
+                            subid = urljoin(base, u2)
+                            basedf, _ = urllib.parse.urldefrag(base)
+                            subiddf, _ = urllib.parse.urldefrag(subid)
+                            if basedf == subiddf:
+                                continue
+                            sub = cast(
+                                Union[MutableSequence[CWLObjectType], CWLObjectType],
+                                loadref(base, u2),
+                            )
+                            deps2 = {
+                                "class": "File",
+                                "location": subid,
+                                "format": CWL_IANA,
+                            }  # type: CWLObjectType
+                            sf = scandeps(
+                                subid,
+                                sub,
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                            if sf:
+                                deps2["secondaryFiles"] = cast(
+                                    MutableSequence[CWLOutputAtomType], mergedirs(sf)
+                                )
+                            if nestdirs:
+                                deps2 = nestdir(base, deps2)
+                            if r is None:
+                                r = []
+                            r.append(deps2)
+            elif k in urlfields and k != "location":
+                with Perf(metrics, "k in urlfields"):
+                    for u3 in aslist(v):
+                        deps = {"class": "File", "location": urljoin(base, u3)}
+                        if nestdirs:
+                            deps = nestdir(base, deps)
+                        if r is None:
+                            r = []
+                        r.append(deps)
+            elif doc.get("class") in ("File", "Directory") and k in (
+                "listing",
+                "secondaryFiles",
+            ):
+                # should be handled earlier.
+                pass
+            else:
+                with Perf(metrics, "k is something else"):
+                    sd = scandeps(
+                            base,
+                            cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                    if sd:
+                        if r is None:
+                            r = sd
+                        else:
+                            r.extend(sd)
+    elif isinstance(doc, MutableSequence):
+        with Perf(metrics, "d in doc"):
+            for d in doc:
+                sd = scandeps(
+                        base,
+                        d,
+                        reffields,
+                        urlfields,
+                        loadref,
+                        urljoin=urljoin,
+                        nestdirs=nestdirs,
+                        do_normalize=False,
+                    )
+                if r is None:
+                    r = sd
+                else:
+                    r.extend(sd)
+
+    if r and do_normalize:
+        normalizeFilesDirs(r)
+
+    return r