19280: Metrics on scandeps
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 7d6d287a207455d04e2a1d5469e053ea57f7cf6e..6db7dfb2e362c705c26c4d8bcfcda854e5b0cbf9 100644 (file)
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -28,7 +51,7 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
@@ -39,8 +62,9 @@ from cwltool.builder import Builder
 import schema_salad.validate as validate
 
 import arvados.collection
 import schema_salad.validate as validate
 
 import arvados.collection
+import arvados.util
 from .util import collectionUUID
 from .util import collectionUUID
-import ruamel.yaml as yaml
+from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
@@ -48,8 +72,10 @@ from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, col
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
+from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def trim_anonymous_location(obj):
     """Remove 'location' field from File and Directory literals.
 
 def trim_anonymous_location(obj):
     """Remove 'location' field from File and Directory literals.
@@ -128,6 +154,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
         return
 
             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
         return
 
+    if inputschema == "File":
+        inputschema = {"type": "File"}
+
     if isinstance(inputschema, basestring):
         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
         if sd:
     if isinstance(inputschema, basestring):
         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
         if sd:
@@ -163,10 +192,13 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
 
     elif (inputschema["type"] == "File" and
             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
 
     elif (inputschema["type"] == "File" and
-          secondaryspec and
           isinstance(primary, Mapping) and
           isinstance(primary, Mapping) and
-          primary.get("class") == "File" and
-          "secondaryFiles" not in primary):
+          primary.get("class") == "File"):
+
+        if "secondaryFiles" in primary or not secondaryspec:
+            # Nothing to do.
+            return
+
         #
         # Found a file, check for secondaryFiles
         #
         #
         # Found a file, check for secondaryFiles
         #
@@ -174,9 +206,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
             if builder.cwlVersion == "v1.0":
         primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
             if builder.cwlVersion == "v1.0":
-                pattern = builder.do_eval(sf, context=primary)
+                pattern = sf
             else:
             else:
-                pattern = builder.do_eval(sf["pattern"], context=primary)
+                pattern = sf["pattern"]
             if pattern is None:
                 continue
             if isinstance(pattern, list):
             if pattern is None:
                 continue
             if isinstance(pattern, list):
@@ -213,7 +245,20 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                     "Expression must return list, object, string or null")
 
             if pattern is not None:
                     "Expression must return list, object, string or null")
 
             if pattern is not None:
-                sfpath = substitute(primary["location"], pattern)
+                if "${" in pattern or "$(" in pattern:
+                    sfname = builder.do_eval(pattern, context=primary)
+                else:
+                    sfname = substitute(primary["basename"], pattern)
+
+                if sfname is None:
+                    continue
+
+                p_location = primary["location"]
+                if "/" in p_location:
+                    sfpath = (
+                        p_location[0 : p_location.rindex("/") + 1]
+                        + sfname
+                    )
 
             required = builder.do_eval(required, context=primary)
 
 
             required = builder.do_eval(required, context=primary)
 
@@ -229,7 +274,7 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
-    elif inputschema["type"] not in primitive_types_set:
+    elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
 
 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
 
 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
@@ -239,8 +284,9 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run,
-                        include_primary=True, discovered_secondaryfiles=None):
+                        workflowobj, uri, loadref_run, runtimeContext,
+                        include_primary=True, discovered_secondaryfiles=None,
+                        cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
     Returns a pathmapper object mapping local paths to keep references.  Also
     """Upload the dependencies of the workflowobj document to Keep.
 
     Returns a pathmapper object mapping local paths to keep references.  Also
@@ -259,13 +305,19 @@ def upload_dependencies(arvrunner, name, document_loader,
         defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
         defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
+            if cache is not None and defrg in cache:
+                return cache[defrg]
             # Use fetch_text to get raw file (before preprocessing).
             text = document_loader.fetch_text(defrg)
             if isinstance(text, bytes):
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
             # Use fetch_text to get raw file (before preprocessing).
             text = document_loader.fetch_text(defrg)
             if isinstance(text, bytes):
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
-            return yaml.safe_load(textIO)
+            yamlloader = YAML(typ='safe', pure=True)
+            result = yamlloader.load(textIO)
+            if cache is not None:
+                cache[defrg] = result
+            return result
         else:
             return {}
 
         else:
             return {}
 
@@ -282,11 +334,28 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     metadata = scanobj
 
 
     metadata = scanobj
 
-    sc_result = scandeps(uri, scanobj,
-                         loadref_fields,
-                         set(("$include", "$schemas", "location")),
-                         loadref, urljoin=document_loader.fetcher.urljoin,
-                         nestdirs=False)
+    with Perf(metrics, "scandeps include, location"):
+        sc_result = scandeps(uri, scanobj,
+                             loadref_fields,
+                             set(("$include", "location")),
+                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
+
+    with Perf(metrics, "scandeps $schemas"):
+        optional_deps = scandeps(uri, scanobj,
+                                      loadref_fields,
+                                      set(("$schemas",)),
+                                      loadref, urljoin=document_loader.fetcher.urljoin,
+                                      nestdirs=False)
+
+    if sc_result is None:
+        sc_result = []
+
+    if optional_deps is None:
+        optional_deps = []
+
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
 
     sc = []
     uuids = {}
@@ -314,54 +383,48 @@ def upload_dependencies(arvrunner, name, document_loader,
             sc.append(obj)
         collect_uuids(obj)
 
             sc.append(obj)
         collect_uuids(obj)
 
-    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
-    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+    with Perf(metrics, "collect uuids"):
+        visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+    with Perf(metrics, "collect uploads"):
+        visit_class(sc_result, ("File", "Directory"), collect_uploads)
 
     # Resolve any collection uuids we found to portable data hashes
     # and assign them to uuid_map
     uuid_map = {}
     fetch_uuids = list(uuids.keys())
 
     # Resolve any collection uuids we found to portable data hashes
     # and assign them to uuid_map
     uuid_map = {}
     fetch_uuids = list(uuids.keys())
-    while fetch_uuids:
-        # For a large number of fetch_uuids, API server may limit
-        # response size, so keep fetching from API server has nothing
-        # more to give us.
-        lookups = arvrunner.api.collections().list(
-            filters=[["uuid", "in", fetch_uuids]],
-            count="none",
-            select=["uuid", "portable_data_hash"]).execute(
-                num_retries=arvrunner.num_retries)
+    with Perf(metrics, "fetch_uuids"):
+        while fetch_uuids:
+            # For a large number of fetch_uuids, API server may limit
+            # response size, so keep fetching from API server has nothing
+            # more to give us.
+            lookups = arvrunner.api.collections().list(
+                filters=[["uuid", "in", fetch_uuids]],
+                count="none",
+                select=["uuid", "portable_data_hash"]).execute(
+                    num_retries=arvrunner.num_retries)
 
 
-        if not lookups["items"]:
-            break
+            if not lookups["items"]:
+                break
 
 
-        for l in lookups["items"]:
-            uuid_map[l["uuid"]] = l["portable_data_hash"]
+            for l in lookups["items"]:
+                uuid_map[l["uuid"]] = l["portable_data_hash"]
 
 
-        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+            fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
 
     normalizeFilesDirs(sc)
 
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
-
     def visit_default(obj):
     def visit_default(obj):
-        remove = [False]
-        def ensure_default_location(f):
+        def defaults_are_optional(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
-        visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+            normalizeFilesDirs(f)
+            optional_deps.append(f)
+        visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
 
     find_defaults(workflowobj, visit_default)
 
 
     find_defaults(workflowobj, visit_default)
 
@@ -393,16 +456,24 @@ def upload_dependencies(arvrunner, name, document_loader,
         else:
             del discovered[d]
 
         else:
             del discovered[d]
 
-    mapper = ArvPathMapper(arvrunner, sc, "",
-                           "keep:%s",
-                           "keep:%s/%s",
-                           name=name,
-                           single_collection=True)
+    with Perf(metrics, "mapper"):
+        mapper = ArvPathMapper(arvrunner, sc, "",
+                               "keep:%s",
+                               "keep:%s/%s",
+                               name=name,
+                               single_collection=True,
+                               optional_deps=optional_deps)
+
+    keeprefs = set()
+    def addkeepref(k):
+        if k.startswith("keep:"):
+            keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
     def setloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
 
     def setloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            addkeepref(p["location"])
             return
 
         if not loc:
             return
 
         if not loc:
@@ -424,7 +495,10 @@ def upload_dependencies(arvrunner, name, document_loader,
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
+            # Not a uuid pattern (must be a pdh pattern)
+            addkeepref(p["location"])
             return
             return
+
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
@@ -432,13 +506,46 @@ def upload_dependencies(arvrunner, name, document_loader,
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
-    visit_class(workflowobj, ("File", "Directory"), setloc)
-    visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), setloc)
+        visit_class(discovered, ("File", "Directory"), setloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
+    if runtimeContext.copy_deps:
+        # Find referenced collections and copy them into the
+        # destination project, for easy sharing.
+        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+                                     filters=[["portable_data_hash", "in", list(keeprefs)],
+                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
+                                     select=["uuid", "portable_data_hash", "created_at"]))
+
+        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+        for kr in keeprefs:
+            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+                                                  order="created_at desc",
+                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+                                                   limit=1).execute()
+            if len(col["items"]) == 0:
+                logger.warning("Cannot find collection with portable data hash %s", kr)
+                continue
+            col = col["items"][0]
+            try:
+                arvrunner.api.collections().create(body={"collection": {
+                    "owner_uuid": runtimeContext.project_uuid,
+                    "name": col["name"],
+                    "description": col["description"],
+                    "properties": col["properties"],
+                    "portable_data_hash": col["portable_data_hash"],
+                    "manifest_text": col["manifest_text"],
+                    "storage_classes_desired": col["storage_classes_desired"],
+                    "trash_at": col["trash_at"]
+                }}, ensure_unique_name=True).execute()
+            except Exception as e:
+                logger.warning("Unable copy collection to destination: %s", e)
+
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
@@ -449,30 +556,36 @@ def upload_dependencies(arvrunner, name, document_loader,
     return mapper
 
 
     return mapper
 
 
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
-                # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix)
+
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix)
+                                                       True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
+            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
 
 
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -501,9 +614,11 @@ def packed_workflow(arvrunner, tool, merged_map):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             arvrunner.project_uuid,
-                                                                                                             arvrunner.runtimeContext.force_docker_pull,
-                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix)
+                                                                                                             runtimeContext.project_uuid,
+                                                                                                             runtimeContext.force_docker_pull,
+                                                                                                             runtimeContext.tmp_outdir_prefix,
+                                                                                                             runtimeContext.match_local_docker,
+                                                                                                             runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -524,7 +639,7 @@ def tag_git_version(packed):
             packed["http://schema.org/version"] = githash
 
 
             packed["http://schema.org/version"] = githash
 
 
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
@@ -560,7 +675,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False)
+                                    False,
+                                    runtimeContext)
 
     if "id" in job_order:
         del job_order["id"]
 
     if "id" in job_order:
         del job_order["id"]
@@ -574,26 +690,30 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool)
+    with Perf(metrics, "upload_docker"):
+        upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
     merged_map = {}
 
     document_loader = tool.doc_loader
 
     merged_map = {}
-
+    tool_dep_cache = {}
     def upload_tool_deps(deptool):
         if "id" in deptool:
             discovered_secondaryfiles = {}
     def upload_tool_deps(deptool):
         if "id" in deptool:
             discovered_secondaryfiles = {}
-            pm = upload_dependencies(arvrunner,
-                                     "%s dependencies" % (shortname(deptool["id"])),
-                                     document_loader,
-                                     deptool,
-                                     deptool["id"],
-                                     False,
-                                     include_primary=False,
-                                     discovered_secondaryfiles=discovered_secondaryfiles)
+            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+                pm = upload_dependencies(arvrunner,
+                                         "%s dependencies" % (shortname(deptool["id"])),
+                                         document_loader,
+                                         deptool,
+                                         deptool["id"],
+                                         False,
+                                         runtimeContext,
+                                         include_primary=False,
+                                         discovered_secondaryfiles=discovered_secondaryfiles,
+                                         cache=tool_dep_cache)
             document_loader.idx[deptool["id"]] = deptool
             toolmap = {}
             for k,v in pm.items():
             document_loader.idx[deptool["id"]] = deptool
             toolmap = {}
             for k,v in pm.items():
@@ -604,18 +724,22 @@ def upload_workflow_deps(arvrunner, tool):
 
     return merged_map
 
 
     return merged_map
 
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
-                                                          arvrunner.runtimeContext.force_docker_pull,
-                                                          arvrunner.runtimeContext.tmp_outdir_prefix)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+                                                          True,
+                                                          runtimeContext.project_uuid,
+                                                          runtimeContext.force_docker_pull,
+                                                          runtimeContext.tmp_outdir_prefix,
+                                                          runtimeContext.match_local_docker,
+                                                          runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
@@ -624,15 +748,15 @@ def upload_workflow_collection(arvrunner, name, packed):
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
-    if arvrunner.project_uuid:
-        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    if runtimeContext.project_uuid:
+        filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
-                            owner_uuid=arvrunner.project_uuid,
+                            owner_uuid=runtimeContext.project_uuid,
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
@@ -764,3 +888,235 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+
+
+# --- from cwltool ---
+
+
+CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
+
+
+def scandeps(
+    base: str,
+    doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
+    reffields: Set[str],
+    urlfields: Set[str],
+    loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
+    urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
+    nestdirs: bool = True,
+    do_normalize: bool = True,
+) -> Optional[MutableSequence[CWLObjectType]]:
+
+    """Given a CWL document or input object, search for dependencies
+    (references to external files) of 'doc' and return them as a list
+    of File or Directory objects.
+
+    The 'base' is the base URL for relative references.
+
+    Looks for objects with 'class: File' or 'class: Directory' and
+    adds them to the list of dependencies.
+
+    Anything in 'urlfields' is also added as a File dependency.
+
+    Anything in 'reffields' (such as workflow step 'run') will be
+    added as a dependency and also loaded (using the 'loadref'
+    function) and recursively scanned for dependencies.  Those
+    dependencies will be added as secondary files to the primary file.
+
+    If "nestdirs" is true, create intermediate directory objects when
+    a file is located in a subdirectory under the starting directory.
+    This is so that if the dependencies are materialized, they will
+    produce the same relative file system locations.
+
+    """
+
+    r: Optional[MutableSequence[CWLObjectType]] = None
+    if isinstance(doc, MutableMapping):
+        if "id" in doc:
+            if cast(str, doc["id"]).startswith("file://"):
+                df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
+                if base != df:
+                    if r is None:
+                        r = []
+                    r.append({"class": "File", "location": df, "format": CWL_IANA})
+                    base = df
+
+        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
+            with Perf(metrics, "File or Directory with location"):
+                u = cast(Optional[str], doc.get("location", doc.get("path")))
+                if u and not u.startswith("_:"):
+                    deps = {
+                        "class": doc["class"],
+                        "location": urljoin(base, u),
+                    }  # type: CWLObjectType
+                    if "basename" in doc:
+                        deps["basename"] = doc["basename"]
+                    if doc["class"] == "Directory" and "listing" in doc:
+                        deps["listing"] = doc["listing"]
+                    if doc["class"] == "File" and "secondaryFiles" in doc:
+                        sd = scandeps(
+                            base,
+                            cast(
+                                Union[CWLObjectType, MutableSequence[CWLObjectType]],
+                                doc["secondaryFiles"],
+                            ),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                        if sd:
+                            deps["secondaryFiles"] = cast(
+                                CWLOutputAtomType,
+                                sd
+                            )
+                    if nestdirs:
+                        deps = nestdir(base, deps)
+                    if r is None:
+                        r = []
+                    r.append(deps)
+                else:
+                    if doc["class"] == "Directory" and "listing" in doc:
+                        sd = scandeps(
+                                base,
+                                cast(MutableSequence[CWLObjectType], doc["listing"]),
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                        if sd:
+                            if r is None:
+                                r = []
+                            r.extend(sd)
+                    elif doc["class"] == "File" and "secondaryFiles" in doc:
+                        sd = scandeps(
+                                base,
+                                cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                        if sd:
+                            if r is None:
+                                r = sd
+                            else:
+                                r.extend(sd)
+
+        for k, v in doc.items():
+            if k in reffields:
+                with Perf(metrics, "k in reffields"):
+                    for u2 in aslist(v):
+                        if isinstance(u2, MutableMapping):
+                            sd = scandeps(
+                                    base,
+                                    u2,
+                                    reffields,
+                                    urlfields,
+                                    loadref,
+                                    urljoin=urljoin,
+                                    nestdirs=nestdirs,
+                                    do_normalize=False,
+                                )
+                            if sd:
+                                if r is None:
+                                    r = sd
+                                else:
+                                    r.extend(sd)
+                        else:
+                            subid = urljoin(base, u2)
+                            basedf, _ = urllib.parse.urldefrag(base)
+                            subiddf, _ = urllib.parse.urldefrag(subid)
+                            if basedf == subiddf:
+                                continue
+                            sub = cast(
+                                Union[MutableSequence[CWLObjectType], CWLObjectType],
+                                loadref(base, u2),
+                            )
+                            deps2 = {
+                                "class": "File",
+                                "location": subid,
+                                "format": CWL_IANA,
+                            }  # type: CWLObjectType
+                            sf = scandeps(
+                                subid,
+                                sub,
+                                reffields,
+                                urlfields,
+                                loadref,
+                                urljoin=urljoin,
+                                nestdirs=nestdirs,
+                                do_normalize=False,
+                            )
+                            if sf:
+                                deps2["secondaryFiles"] = cast(
+                                    MutableSequence[CWLOutputAtomType], mergedirs(sf)
+                                )
+                            if nestdirs:
+                                deps2 = nestdir(base, deps2)
+                            if r is None:
+                                r = []
+                            r.append(deps2)
+            elif k in urlfields and k != "location":
+                with Perf(metrics, "k in urlfields"):
+                    for u3 in aslist(v):
+                        deps = {"class": "File", "location": urljoin(base, u3)}
+                        if nestdirs:
+                            deps = nestdir(base, deps)
+                        if r is None:
+                            r = []
+                        r.append(deps)
+            elif doc.get("class") in ("File", "Directory") and k in (
+                "listing",
+                "secondaryFiles",
+            ):
+                # should be handled earlier.
+                pass
+            else:
+                with Perf(metrics, "k is something else"):
+                    sd = scandeps(
+                            base,
+                            cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
+                            reffields,
+                            urlfields,
+                            loadref,
+                            urljoin=urljoin,
+                            nestdirs=nestdirs,
+                            do_normalize=False,
+                        )
+                    if sd:
+                        if r is None:
+                            r = sd
+                        else:
+                            r.extend(sd)
+    elif isinstance(doc, MutableSequence):
+        with Perf(metrics, "d in doc"):
+            for d in doc:
+                sd = scandeps(
+                        base,
+                        d,
+                        reffields,
+                        urlfields,
+                        loadref,
+                        urljoin=urljoin,
+                        nestdirs=nestdirs,
+                        do_normalize=False,
+                    )
+                if r is None:
+                    r = sd
+                else:
+                    r.extend(sd)
+
+    if r and do_normalize:
+        normalizeFilesDirs(r)
+
+    return r