20933: Copies collections reported by a-c-r
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 7d4310b0e0ce94b9430ded7f60ca04416e2964b9..860f8a1b9a5f67966dca0041df937981aed9b330 100644 (file)
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -30,26 +53,30 @@ from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
 from cwltool.builder import Builder
 import schema_salad.validate as validate
+import schema_salad.ref_resolver
 
 import arvados.collection
+import arvados.util
 from .util import collectionUUID
 from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
+from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def trim_anonymous_location(obj):
     """Remove 'location' field from File and Directory literals.
@@ -128,6 +155,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
         return
 
+    if inputschema == "File":
+        inputschema = {"type": "File"}
+
     if isinstance(inputschema, basestring):
         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
         if sd:
@@ -163,10 +193,13 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
 
     elif (inputschema["type"] == "File" and
-          secondaryspec and
           isinstance(primary, Mapping) and
-          primary.get("class") == "File" and
-          "secondaryFiles" not in primary):
+          primary.get("class") == "File"):
+
+        if "secondaryFiles" in primary or not secondaryspec:
+            # Nothing to do.
+            return
+
         #
         # Found a file, check for secondaryFiles
         #
@@ -174,9 +207,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
             if builder.cwlVersion == "v1.0":
-                pattern = builder.do_eval(sf, context=primary)
+                pattern = sf
             else:
-                pattern = builder.do_eval(sf["pattern"], context=primary)
+                pattern = sf["pattern"]
             if pattern is None:
                 continue
             if isinstance(pattern, list):
@@ -213,23 +246,46 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                     "Expression must return list, object, string or null")
 
             if pattern is not None:
-                sfpath = substitute(primary["location"], pattern)
+                if "${" in pattern or "$(" in pattern:
+                    sfname = builder.do_eval(pattern, context=primary)
+                else:
+                    sfname = substitute(primary["basename"], pattern)
+
+                if sfname is None:
+                    continue
+
+                if isinstance(sfname, str):
+                    p_location = primary["location"]
+                    if "/" in p_location:
+                        sfpath = (
+                            p_location[0 : p_location.rindex("/") + 1]
+                            + sfname
+                        )
 
             required = builder.do_eval(required, context=primary)
 
-            if fsaccess.exists(sfpath):
-                if pattern is not None:
-                    found.append({"location": sfpath, "class": "File"})
-                else:
-                    found.append(sf)
-            elif required:
-                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
-                    "Required secondary file '%s' does not exist" % sfpath)
+            if isinstance(sfname, list) or isinstance(sfname, dict):
+                each = aslist(sfname)
+                for e in each:
+                    if required and not fsaccess.exists(e.get("location")):
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "Required secondary file '%s' does not exist" % e.get("location"))
+                found.extend(each)
+
+            if isinstance(sfname, str):
+                if fsaccess.exists(sfpath):
+                    if pattern is not None:
+                        found.append({"location": sfpath, "class": "File"})
+                    else:
+                        found.append(sf)
+                elif required:
+                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                        "Required secondary file '%s' does not exist" % sfpath)
 
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
-    elif inputschema["type"] not in primitive_types_set:
+    elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
 
 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
@@ -239,63 +295,44 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run,
-                        include_primary=True, discovered_secondaryfiles=None):
+                        workflowobj, uri, runtimeContext,
+                        include_primary=True, discovered_secondaryfiles=None,
+                        cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
     Returns a pathmapper object mapping local paths to keep references.  Also
     does an in-place update of references in "workflowobj".
 
-    Use scandeps to find $import, $include, $schemas, run, File and Directory
+    Use scandeps to find $schemas, File and Directory
     fields that represent external references.
 
     If workflowobj has an "id" field, this will reload the document to ensure
     it is scanning the raw document prior to preprocessing.
     """
 
-    loaded = set()
-    def loadref(b, u):
-        joined = document_loader.fetcher.urljoin(b, u)
-        defrg, _ = urllib.parse.urldefrag(joined)
-        if defrg not in loaded:
-            loaded.add(defrg)
-            # Use fetch_text to get raw file (before preprocessing).
-            text = document_loader.fetch_text(defrg)
-            if isinstance(text, bytes):
-                textIO = StringIO(text.decode('utf-8'))
-            else:
-                textIO = StringIO(text)
-            yamlloader = YAML(typ='safe', pure=True)
-            return yamlloader.load(textIO)
-        else:
-            return {}
-
-    if loadref_run:
-        loadref_fields = set(("$import", "run"))
-    else:
-        loadref_fields = set(("$import",))
-
     scanobj = workflowobj
-    if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        # Need raw file content (before preprocessing) to ensure
-        # that external references in $include and $mixin are captured.
-        scanobj = loadref("", workflowobj["id"])
-
     metadata = scanobj
 
-    sc_result = scandeps(uri, scanobj,
-                         loadref_fields,
-                         set(("$include", "location")),
-                         loadref, urljoin=document_loader.fetcher.urljoin,
-                         nestdirs=False)
+    with Perf(metrics, "scandeps"):
+        sc_result = scandeps(uri, scanobj,
+                             set(),
+                             set(("location",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
+        optional_deps = scandeps(uri, scanobj,
+                             set(),
+                             set(("$schemas",)),
+                             None, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
+
+    if sc_result is None:
+        sc_result = []
 
-    optional_deps = scandeps(uri, scanobj,
-                                  loadref_fields,
-                                  set(("$schemas",)),
-                                  loadref, urljoin=document_loader.fetcher.urljoin,
-                                  nestdirs=False)
+    if optional_deps is None:
+        optional_deps = []
 
-    sc_result.extend(optional_deps)
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -323,35 +360,45 @@ def upload_dependencies(arvrunner, name, document_loader,
             sc.append(obj)
         collect_uuids(obj)
 
-    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
-    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+    with Perf(metrics, "collect uuids"):
+        visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+    with Perf(metrics, "collect uploads"):
+        visit_class(sc_result, ("File", "Directory"), collect_uploads)
 
     # Resolve any collection uuids we found to portable data hashes
     # and assign them to uuid_map
     uuid_map = {}
     fetch_uuids = list(uuids.keys())
-    while fetch_uuids:
-        # For a large number of fetch_uuids, API server may limit
-        # response size, so keep fetching from API server has nothing
-        # more to give us.
-        lookups = arvrunner.api.collections().list(
-            filters=[["uuid", "in", fetch_uuids]],
-            count="none",
-            select=["uuid", "portable_data_hash"]).execute(
-                num_retries=arvrunner.num_retries)
+    with Perf(metrics, "fetch_uuids"):
+        while fetch_uuids:
+            # For a large number of fetch_uuids, API server may limit
+            # response size, so keep fetching from API server has nothing
+            # more to give us.
+            lookups = arvrunner.api.collections().list(
+                filters=[["uuid", "in", fetch_uuids]],
+                count="none",
+                select=["uuid", "portable_data_hash"]).execute(
+                    num_retries=arvrunner.num_retries)
 
-        if not lookups["items"]:
-            break
+            if not lookups["items"]:
+                break
 
-        for l in lookups["items"]:
-            uuid_map[l["uuid"]] = l["portable_data_hash"]
+            for l in lookups["items"]:
+                uuid_map[l["uuid"]] = l["portable_data_hash"]
 
-        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+            fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
-    if include_primary and "id" in workflowobj:
-        sc.append({"class": "File", "location": workflowobj["id"]})
+    if "id" in workflowobj:
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if include_primary:
+            # make sure it's included
+            sc.append({"class": "File", "location": defrg})
+        else:
+            # make sure it's excluded
+            sc = [d for d in sc if d.get("location") != defrg]
 
     def visit_default(obj):
         def defaults_are_optional(f):
@@ -392,17 +439,27 @@ def upload_dependencies(arvrunner, name, document_loader,
         else:
             del discovered[d]
 
-    mapper = ArvPathMapper(arvrunner, sc, "",
-                           "keep:%s",
-                           "keep:%s/%s",
-                           name=name,
-                           single_collection=True,
-                           optional_deps=optional_deps)
+    with Perf(metrics, "mapper"):
+        mapper = ArvPathMapper(arvrunner, sc, "",
+                               "keep:%s",
+                               "keep:%s/%s",
+                               name=name,
+                               single_collection=True,
+                               optional_deps=optional_deps)
+
+    for k, v in uuid_map.items():
+        mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
 
-    def setloc(p):
+    keeprefs = set()
+    def addkeepref(k):
+        if k.startswith("keep:"):
+            keeprefs.add(collection_pdh_pattern.match(k).group(1))
+
+
+    def collectloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
-            p["location"] = mapper.mapper(p["location"]).resolved
+            addkeepref(p["location"])
             return
 
         if not loc:
@@ -424,21 +481,56 @@ def upload_dependencies(arvrunner, name, document_loader,
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
+            # Not a uuid pattern (must be a pdh pattern)
+            addkeepref(p["location"])
             return
+
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
                 "Collection uuid %s not found" % uuid)
-        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
-        p[collectionUUID] = uuid
 
-    visit_class(workflowobj, ("File", "Directory"), setloc)
-    visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "collectloc"):
+        visit_class(workflowobj, ("File", "Directory"), collectloc)
+        visit_class(discovered, ("File", "Directory"), collectloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
+    if runtimeContext.copy_deps:
+        # Find referenced collections and copy them into the
+        # destination project, for easy sharing.
+        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+                                     filters=[["portable_data_hash", "in", list(keeprefs)],
+                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
+                                     select=["uuid", "portable_data_hash", "created_at"]))
+
+        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+        for kr in keeprefs:
+            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+                                                  order="created_at desc",
+                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+                                                   limit=1).execute()
+            if len(col["items"]) == 0:
+                logger.warning("Cannot find collection with portable data hash %s", kr)
+                continue
+            col = col["items"][0]
+            col["name"] = arvados.util.trim_name(col["name"])
+            try:
+                arvrunner.api.collections().create(body={"collection": {
+                    "owner_uuid": runtimeContext.project_uuid,
+                    "name": col["name"],
+                    "description": col["description"],
+                    "properties": col["properties"],
+                    "portable_data_hash": col["portable_data_hash"],
+                    "manifest_text": col["manifest_text"],
+                    "storage_classes_desired": col["storage_classes_desired"],
+                    "trash_at": col["trash_at"]
+                }}, ensure_unique_name=True).execute()
+            except Exception as e:
+                logger.warning("Unable to copy collection to destination: %s", e)
+
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
@@ -449,32 +541,26 @@ def upload_dependencies(arvrunner, name, document_loader,
     return mapper
 
 
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
-                # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker)
+
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker)
+                                                       True, runtimeContext)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
+            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -503,16 +589,18 @@ def packed_workflow(arvrunner, tool, merged_map):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             arvrunner.project_uuid,
-                                                                                                             arvrunner.runtimeContext.force_docker_pull,
-                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                                                                             arvrunner.runtimeContext.match_local_docker)
+                                                                                                             runtimeContext)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
             for l in v:
                 visit(l, cur_id)
     visit(packed, None)
+
+    if git_info:
+        for g in git_info:
+            packed[g] = git_info[g]
+
     return packed
 
 
@@ -526,8 +614,75 @@ def tag_git_version(packed):
         else:
             packed["http://schema.org/version"] = githash
 
+def setloc(mapper, p):
+    loc = p.get("location")
+    if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+        p["location"] = mapper.mapper(p["location"]).resolved
+        return
+
+    if not loc:
+        return
+
+    if collectionUUID in p:
+        uuid = p[collectionUUID]
+        keepuuid = "keep:"+uuid
+        if keepuuid not in mapper:
+            raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        gp = collection_pdh_pattern.match(loc)
+        if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
+            # This file entry has both collectionUUID and a PDH
+            # location. If the PDH doesn't match the one returned
+            # the API server, raise an error.
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Expected collection uuid %s to be %s but API server reported %s" % (
+                    uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
+
+    gp = collection_uuid_pattern.match(loc)
+    if not gp:
+        # Not a uuid pattern (must be a pdh pattern)
+        return
+
+    uuid = gp.groups()[0]
+    keepuuid = "keep:"+uuid
+    if keepuuid not in mapper:
+        raise SourceLine(p, "location", validate.ValidationException).makeError(
+            "Collection uuid %s not found" % uuid)
+    p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
+    p[collectionUUID] = uuid
+
+def update_from_mapper(workflowobj, mapper):
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
 
-def upload_job_order(arvrunner, name, tool, job_order):
+def apply_merged_map(merged_map, workflowobj):
+    def visit(v, cur_id):
+        if isinstance(v, dict):
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if "id" in v:
+                    cur_id = v["id"]
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            #if v.get("class") == "DockerRequirement":
+            #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+            #                                                                                                 runtimeContext)
+            for l in v:
+                visit(v[l], cur_id)
+        if isinstance(v, list):
+            for l in v:
+                visit(l, cur_id)
+    visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+    tool.visit(partial(apply_merged_map, merged_map))
+
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
@@ -558,12 +713,15 @@ def upload_job_order(arvrunner, name, tool, job_order):
                              tool.tool["inputs"],
                              job_order)
 
+    _jobloaderctx = jobloaderctx.copy()
+    jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
     jobmapper = upload_dependencies(arvrunner,
                                     name,
-                                    tool.doc_loader,
+                                    jobloader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False)
+                                    runtimeContext)
 
     if "id" in job_order:
         del job_order["id"]
@@ -573,53 +731,67 @@ def upload_job_order(arvrunner, name, tool, job_order):
     if "job_order" in job_order:
         del job_order["job_order"]
 
-    return job_order
+    update_from_mapper(job_order, jobmapper)
+
+    return job_order, jobmapper
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool)
+    with Perf(metrics, "upload_docker"):
+        upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
     merged_map = {}
+    tool_dep_cache = {}
 
+    todo = []
+
+    # Standard traversal is top down, we want to go bottom up, so use
+    # the visitor to accumalate a list of nodes to visit, then
+    # visit them in reverse order.
     def upload_tool_deps(deptool):
         if "id" in deptool:
-            discovered_secondaryfiles = {}
+            todo.append(deptool)
+
+    tool.visit(upload_tool_deps)
+
+    for deptool in reversed(todo):
+        discovered_secondaryfiles = {}
+        with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
             pm = upload_dependencies(arvrunner,
                                      "%s dependencies" % (shortname(deptool["id"])),
                                      document_loader,
                                      deptool,
                                      deptool["id"],
-                                     False,
+                                     runtimeContext,
                                      include_primary=False,
-                                     discovered_secondaryfiles=discovered_secondaryfiles)
-            document_loader.idx[deptool["id"]] = deptool
-            toolmap = {}
-            for k,v in pm.items():
-                toolmap[k] = v.resolved
-            merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+                                     discovered_secondaryfiles=discovered_secondaryfiles,
+                                     cache=tool_dep_cache)
 
-    tool.visit(upload_tool_deps)
+        document_loader.idx[deptool["id"]] = deptool
+        toolmap = {}
+        for k,v in pm.items():
+            toolmap[k] = v.resolved
+
+        merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
 
     return merged_map
 
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
-                                                          arvrunner.runtimeContext.force_docker_pull,
-                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                          arvrunner.runtimeContext.match_local_docker)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+                                                          True, runtimeContext)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
@@ -628,15 +800,15 @@ def upload_workflow_collection(arvrunner, name, packed):
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
-    if arvrunner.project_uuid:
-        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    if runtimeContext.project_uuid:
+        filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
-                            owner_uuid=arvrunner.project_uuid,
+                            owner_uuid=runtimeContext.project_uuid,
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
@@ -648,19 +820,19 @@ class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, updated_tool,
+    def __init__(self, runner,
                  tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
                  priority=None, secret_store=None,
                  collection_cache_size=256,
-                 collection_cache_is_default=True):
+                 collection_cache_is_default=True,
+                 git_info=None):
 
-        loadingContext = loadingContext.copy()
-        loadingContext.metadata = updated_tool.metadata.copy()
+        self.loadingContext = loadingContext.copy()
 
-        super(Runner, self).__init__(updated_tool.tool, loadingContext)
+        super(Runner, self).__init__(tool.tool, loadingContext)
 
         self.arvrunner = runner
         self.embedded_tool = tool
@@ -672,6 +844,9 @@ class Runner(Process):
             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
+            reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
+            if reuse_req:
+                enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
         self.uuid = None
         self.final_output = None
@@ -683,7 +858,9 @@ class Runner(Process):
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
-        self.enable_dev = loadingContext.enable_dev
+        self.enable_dev = self.loadingContext.enable_dev
+        self.git_info = git_info
+        self.fast_parser = self.loadingContext.fast_parser
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
@@ -746,7 +923,8 @@ class Runner(Process):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
+                done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
+                             include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
 
             self.final_output = record["output"]
             outc = arvados.collection.CollectionReader(self.final_output,
@@ -768,3 +946,33 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+def print_keep_deps_visitor(references, doc_loader, tool):
+    def collect_locators(obj):
+        loc = obj.get("location", "")
+
+        g = arvados.util.keepuri_pattern.match(loc)
+        if g and g[1] not in references:
+            references.append(g[1])
+            return
+
+        loc = obj.get("http://arvados.org/cwl#dockerCollectionPDH", "") or obj.get("acrContainerImage")
+        if loc:
+            references.append(loc)
+
+    sc_result = scandeps(tool["id"], tool,
+                         set(),
+                         set(("location", "id")),
+                         None, urljoin=doc_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    visit_class(sc_result, ("File", "Directory"), collect_locators)
+    visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
+
+
+def print_keep_deps(tool):
+    references = []
+
+    tool.visit(partial(print_keep_deps_visitor, references, tool.doc_loader))
+    print(json.dumps(references))