18994: Remove debug prints
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index a846f2b0016931dcd6a938c47f9572083963d2bc..ddd1d2bacfb1bc7048c4f2d92171e0908656b3a1 100644 (file)
@@ -2,34 +2,52 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
+from future import standard_library
+standard_library.install_aliases()
+from future.utils import  viewvalues, viewitems
+from past.builtins import basestring
+
 import os
-import urlparse
+import sys
+import re
+import urllib.parse
 from functools import partial
 import logging
 import json
-import subprocess32 as subprocess
+import copy
 from collections import namedtuple
+from io import StringIO
+from typing import Mapping, Sequence
 
-from StringIO import StringIO
+if os.name == "posix" and sys.version_info[0] < 3:
+    import subprocess32 as subprocess
+else:
+    import subprocess
 
 from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+                             shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
+from cwltool.update import INTERNAL_VERSION
+from cwltool.builder import Builder
+import schema_salad.validate as validate
 
 import arvados.collection
-import ruamel.yaml as yaml
+from .util import collectionUUID
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
 from ._version import __version__
 from . import done
+from . context import ArvRuntimeContext
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -61,24 +79,164 @@ def find_defaults(d, op):
         if "default" in d:
             op(d)
         else:
-            for i in d.itervalues():
+            for i in viewvalues(d):
                 find_defaults(i, op)
 
-def setSecondary(t, fileobj, discovered):
-    if isinstance(fileobj, dict) and fileobj.get("class") == "File":
-        if "secondaryFiles" not in fileobj:
-            fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
-            if discovered is not None:
-                discovered[fileobj["location"]] = fileobj["secondaryFiles"]
-    elif isinstance(fileobj, list):
-        for e in fileobj:
-            setSecondary(t, e, discovered)
+def make_builder(joborder, hints, requirements, runtimeContext, metadata):
+    return Builder(
+                 job=joborder,
+                 files=[],               # type: List[Dict[Text, Text]]
+                 bindings=[],            # type: List[Dict[Text, Any]]
+                 schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
+                 names=None,               # type: Names
+                 requirements=requirements,        # type: List[Dict[Text, Any]]
+                 hints=hints,               # type: List[Dict[Text, Any]]
+                 resources={},           # type: Dict[str, int]
+                 mutation_manager=None,    # type: Optional[MutationManager]
+                 formatgraph=None,         # type: Optional[Graph]
+                 make_fs_access=None,      # type: Type[StdFsAccess]
+                 fs_access=None,           # type: StdFsAccess
+                 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
+                 timeout=runtimeContext.eval_timeout,             # type: float
+                 debug=runtimeContext.debug,               # type: bool
+                 js_console=runtimeContext.js_console,          # type: bool
+                 force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
+                 loadListing="",         # type: Text
+                 outdir="",              # type: Text
+                 tmpdir="",              # type: Text
+                 stagedir="",            # type: Text
+                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
+                 container_engine="docker"
+                )
+
+def search_schemadef(name, reqs):
+    for r in reqs:
+        if r["class"] == "SchemaDefRequirement":
+            for sd in r["types"]:
+                if sd["name"] == name:
+                    return sd
+    return None
+
+primitive_types_set = frozenset(("null", "boolean", "int", "long",
+                                 "float", "double", "string", "record",
+                                 "array", "enum"))
+
+def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
+    if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
+        # union type, collect all possible secondaryFiles
+        for i in inputschema:
+            set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
+        return
+
+    if isinstance(inputschema, basestring):
+        sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
+        if sd:
+            inputschema = sd
+        else:
+            return
+
+    if "secondaryFiles" in inputschema:
+        # set secondaryFiles, may be inherited by compound types.
+        secondaryspec = inputschema["secondaryFiles"]
+
+    if (isinstance(inputschema["type"], (Mapping, Sequence)) and
+        not isinstance(inputschema["type"], basestring)):
+        # compound type (union, array, record)
+        set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+    elif (inputschema["type"] == "record" and
+          isinstance(primary, Mapping)):
+        #
+        # record type, find secondary files associated with fields.
+        #
+        for f in inputschema["fields"]:
+            p = primary.get(shortname(f["name"]))
+            if p:
+                set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
+
+    elif (inputschema["type"] == "array" and
+          isinstance(primary, Sequence)):
+        #
+        # array type, find secondary files of elements
+        #
+        for p in primary:
+            set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
+
+    elif (inputschema["type"] == "File" and
+          secondaryspec and
+          isinstance(primary, Mapping) and
+          primary.get("class") == "File" and
+          "secondaryFiles" not in primary):
+        #
+        # Found a file, check for secondaryFiles
+        #
+        specs = []
+        primary["secondaryFiles"] = secondaryspec
+        for i, sf in enumerate(aslist(secondaryspec)):
+            if builder.cwlVersion == "v1.0":
+                pattern = builder.do_eval(sf, context=primary)
+            else:
+                pattern = builder.do_eval(sf["pattern"], context=primary)
+            if pattern is None:
+                continue
+            if isinstance(pattern, list):
+                specs.extend(pattern)
+            elif isinstance(pattern, dict):
+                specs.append(pattern)
+            elif isinstance(pattern, str):
+                if builder.cwlVersion == "v1.0":
+                    specs.append({"pattern": pattern, "required": True})
+                else:
+                    specs.append({"pattern": pattern, "required": sf.get("required")})
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+        found = []
+        for i, sf in enumerate(specs):
+            if isinstance(sf, dict):
+                if sf.get("class") == "File":
+                    pattern = None
+                    if sf.get("location") is None:
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "File object is missing 'location': %s" % sf)
+                    sfpath = sf["location"]
+                    required = True
+                else:
+                    pattern = sf["pattern"]
+                    required = sf.get("required")
+            elif isinstance(sf, str):
+                pattern = sf
+                required = True
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+            if pattern is not None:
+                sfpath = substitute(primary["location"], pattern)
 
-def discover_secondary_files(inputs, job_order, discovered=None):
-    for t in inputs:
-        if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
-            setSecondary(t, job_order[shortname(t["id"])], discovered)
+            required = builder.do_eval(required, context=primary)
 
+            if fsaccess.exists(sfpath):
+                if pattern is not None:
+                    found.append({"location": sfpath, "class": "File"})
+                else:
+                    found.append(sf)
+            elif required:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Required secondary file '%s' does not exist" % sfpath)
+
+        primary["secondaryFiles"] = cmap(found)
+        if discovered is not None:
+            discovered[primary["location"]] = primary["secondaryFiles"]
+    elif inputschema["type"] not in primitive_types_set:
+        set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
+    for inputschema in inputs:
+        primary = job_order.get(shortname(inputschema["id"]))
+        if isinstance(primary, (Mapping, Sequence)):
+            set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run,
@@ -98,7 +256,7 @@ def upload_dependencies(arvrunner, name, document_loader,
     loaded = set()
     def loadref(b, u):
         joined = document_loader.fetcher.urljoin(b, u)
-        defrg, _ = urlparse.urldefrag(joined)
+        defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
             # Use fetch_text to get raw file (before preprocessing).
@@ -107,7 +265,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
-            return yaml.safe_load(textIO)
+            yamlloader = YAML(typ='safe', pure=True)
+            return yamlloader.load(textIO)
         else:
             return {}
 
@@ -117,61 +276,126 @@ def upload_dependencies(arvrunner, name, document_loader,
         loadref_fields = set(("$import",))
 
     scanobj = workflowobj
-    if "id" in workflowobj:
+    if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
         # Need raw file content (before preprocessing) to ensure
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
+    metadata = scanobj
+
     sc_result = scandeps(uri, scanobj,
-                  loadref_fields,
-                  set(("$include", "$schemas", "location")),
-                  loadref, urljoin=document_loader.fetcher.urljoin)
+                         loadref_fields,
+                         set(("$include", "location")),
+                         loadref, urljoin=document_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    optional_deps = scandeps(uri, scanobj,
+                                  loadref_fields,
+                                  set(("$schemas",)),
+                                  loadref, urljoin=document_loader.fetcher.urljoin,
+                                  nestdirs=False)
+
+    sc_result.extend(optional_deps)
 
     sc = []
-    def only_real(obj):
-        # Only interested in local files than need to be uploaded,
-        # don't include file literals, keep references, etc.
-        sp = obj.get("location", "").split(":")
-        if len(sp) > 1 and sp[0] in ("file", "http", "https"):
+    uuids = {}
+
+    def collect_uuids(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if sp[0] == "keep":
+            # Collect collection uuids that need to be resolved to
+            # portable data hashes
+            gp = collection_uuid_pattern.match(loc)
+            if gp:
+                uuids[gp.groups()[0]] = obj
+            if collectionUUID in obj:
+                uuids[obj[collectionUUID]] = obj
+
+    def collect_uploads(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if len(sp) < 1:
+            return
+        if sp[0] in ("file", "http", "https"):
+            # Record local files than need to be uploaded,
+            # don't include file literals, keep references, etc.
             sc.append(obj)
+        collect_uuids(obj)
 
-    visit_class(sc_result, ("File", "Directory"), only_real)
+    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+
+    # Resolve any collection uuids we found to portable data hashes
+    # and assign them to uuid_map
+    uuid_map = {}
+    fetch_uuids = list(uuids.keys())
+    while fetch_uuids:
+        # For a large number of fetch_uuids, API server may limit
+        # response size, so keep fetching from API server has nothing
+        # more to give us.
+        lookups = arvrunner.api.collections().list(
+            filters=[["uuid", "in", fetch_uuids]],
+            count="none",
+            select=["uuid", "portable_data_hash"]).execute(
+                num_retries=arvrunner.num_retries)
+
+        if not lookups["items"]:
+            break
+
+        for l in lookups["items"]:
+            uuid_map[l["uuid"]] = l["portable_data_hash"]
+
+        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
+    #if "$schemas" in workflowobj:
+    #    for s in workflowobj["$schemas"]:
+    #        sc.append({"class": "File", "location": s})
 
     def visit_default(obj):
-        remove = [False]
+        #remove = [False]
         def ensure_default_location(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
+            optional_deps.append(f)
+            #if "location" in f and not arvrunner.fs_access.exists(f["location"]):
+            #    # Doesn't exist, remove from list of dependencies to upload
+            #    sc[:] = [x for x in sc if x["location"] != f["location"]]
+            #    # Delete "default" from workflowobj
+            #    remove[0] = True
         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+        #if remove[0]:
+        #    del obj["default"]
 
     find_defaults(workflowobj, visit_default)
 
     discovered = {}
     def discover_default_secondary_files(obj):
-        discover_secondary_files(obj["inputs"],
-                                 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
+        builder_job_order = {}
+        for t in obj["inputs"]:
+            builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
+        # Need to create a builder object to evaluate expressions.
+        builder = make_builder(builder_job_order,
+                               obj.get("hints", []),
+                               obj.get("requirements", []),
+                               ArvRuntimeContext(),
+                               metadata)
+        discover_secondary_files(arvrunner.fs_access,
+                                 builder,
+                                 obj["inputs"],
+                                 builder_job_order,
                                  discovered)
 
-    visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
+    copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
+    visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
 
-    for d in list(discovered.keys()):
+    for d in list(discovered):
         # Only interested in discovered secondaryFiles which are local
         # files that need to be uploaded.
         if d.startswith("file:"):
@@ -183,11 +407,41 @@ def upload_dependencies(arvrunner, name, document_loader,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
-                           single_collection=True)
+                           single_collection=True,
+                           optional_deps=optional_deps)
 
     def setloc(p):
-        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+        loc = p.get("location")
+        if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            return
+
+        if not loc:
+            return
+
+        if collectionUUID in p:
+            uuid = p[collectionUUID]
+            if uuid not in uuid_map:
+                raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                    "Collection uuid %s not found" % uuid)
+            gp = collection_pdh_pattern.match(loc)
+            if gp and uuid_map[uuid] != gp.groups()[0]:
+                # This file entry has both collectionUUID and a PDH
+                # location. If the PDH doesn't match the one returned
+                # the API server, raise an error.
+                raise SourceLine(p, "location", validate.ValidationException).makeError(
+                    "Expected collection uuid %s to be %s but API server reported %s" % (
+                        uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+        gp = collection_uuid_pattern.match(loc)
+        if not gp:
+            return
+        uuid = gp.groups()[0]
+        if uuid not in uuid_map:
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+        p[collectionUUID] = uuid
 
     visit_class(workflowobj, ("File", "Directory"), setloc)
     visit_class(discovered, ("File", "Directory"), setloc)
@@ -197,9 +451,10 @@ def upload_dependencies(arvrunner, name, document_loader,
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
     if "$schemas" in workflowobj:
-        sch = []
+        sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
-            sch.append(mapper.mapper(s).resolved)
+            if s in mapper:
+                sch.append(mapper.mapper(s).resolved)
         workflowobj["$schemas"] = sch
 
     return mapper
@@ -215,9 +470,16 @@ def upload_docker(arvrunner, tool):
                 # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
+                                                       arvrunner.runtimeContext.force_docker_pull,
+                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                       arvrunner.runtimeContext.match_local_docker)
         else:
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
+                                                       True, arvrunner.project_uuid,
+                                                       arvrunner.runtimeContext.force_docker_pull,
+                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                       arvrunner.runtimeContext.match_local_docker)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
@@ -229,23 +491,33 @@ def packed_workflow(arvrunner, tool, merged_map):
     A "packed" workflow is one where all the components have been combined into a single document."""
 
     rewrites = {}
-    packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
-                  tool.tool["id"], tool.metadata, rewrite_out=rewrites)
+    packed = pack(arvrunner.loadingContext, tool.tool["id"],
+                  rewrite_out=rewrites,
+                  loader=tool.doc_loader)
 
-    rewrite_to_orig = {v: k for k,v in rewrites.items()}
+    rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
 
     def visit(v, cur_id):
         if isinstance(v, dict):
-            if v.get("class") in ("CommandLineTool", "Workflow"):
-                if "id" not in v:
-                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
-                cur_id = rewrite_to_orig.get(v["id"], v["id"])
-            if "location" in v and not v["location"].startswith("keep:"):
-                v["location"] = merged_map[cur_id].resolved[v["location"]]
-            if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
-                v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
+                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
+                if "id" in v:
+                    cur_id = rewrite_to_orig.get(v["id"], v["id"])
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
-                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
+                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+                                                                                                             arvrunner.project_uuid,
+                                                                                                             arvrunner.runtimeContext.force_docker_pull,
+                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                                                                             arvrunner.runtimeContext.match_local_docker)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -271,7 +543,31 @@ def upload_job_order(arvrunner, name, tool, job_order):
     object with 'location' updated to the proper keep references.
     """
 
-    discover_secondary_files(tool.tool["inputs"], job_order)
+    # Make a copy of the job order and set defaults.
+    builder_job_order = copy.copy(job_order)
+
+    # fill_in_defaults throws an error if there are any
+    # missing required parameters, we don't want it to do that
+    # so make them all optional.
+    inputs_copy = copy.deepcopy(tool.tool["inputs"])
+    for i in inputs_copy:
+        if "null" not in i["type"]:
+            i["type"] = ["null"] + aslist(i["type"])
+
+    fill_in_defaults(inputs_copy,
+                     builder_job_order,
+                     arvrunner.fs_access)
+    # Need to create a builder object to evaluate expressions.
+    builder = make_builder(builder_job_order,
+                           tool.hints,
+                           tool.requirements,
+                           ArvRuntimeContext(),
+                           tool.metadata)
+    # Now update job_order with secondaryFiles
+    discover_secondary_files(arvrunner.fs_access,
+                             builder,
+                             tool.tool["inputs"],
+                             job_order)
 
     jobmapper = upload_dependencies(arvrunner,
                                     name,
@@ -326,7 +622,10 @@ def arvados_jobs_image(arvrunner, img):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
+                                                          arvrunner.runtimeContext.force_docker_pull,
+                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                          arvrunner.runtimeContext.match_local_docker)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
@@ -356,23 +655,32 @@ def upload_workflow_collection(arvrunner, name, packed):
     return collection.portable_data_hash()
 
 
-class Runner(object):
+class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, tool, job_order, enable_reuse,
+    def __init__(self, runner, updated_tool,
+                 tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
-                 priority=None, secret_store=None):
+                 priority=None, secret_store=None,
+                 collection_cache_size=256,
+                 collection_cache_is_default=True):
+
+        loadingContext = loadingContext.copy()
+        loadingContext.metadata = updated_tool.metadata.copy()
+
+        super(Runner, self).__init__(updated_tool.tool, loadingContext)
+
         self.arvrunner = runner
-        self.tool = tool
-        self.job_order = job_order
+        self.embedded_tool = tool
+        self.job_order = None
         self.running = False
         if enable_reuse:
             # If reuse is permitted by command line arguments but
             # disabled by the workflow itself, disable it.
-            reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
+            reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
@@ -386,16 +694,20 @@ class Runner(object):
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
+        self.enable_dev = loadingContext.enable_dev
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
+        self.collection_cache_size = collection_cache_size
 
-        runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
         if runner_resource_req:
             if runner_resource_req.get("coresMin"):
                 self.submit_runner_cores = runner_resource_req["coresMin"]
             if runner_resource_req.get("ramMin"):
                 self.submit_runner_ram = runner_resource_req["ramMin"]
+            if runner_resource_req.get("keep_cache") and collection_cache_is_default:
+                self.collection_cache_size = runner_resource_req["keep_cache"]
 
         if submit_runner_ram:
             # Command line / initializer overrides default and/or spec from workflow
@@ -409,6 +721,15 @@ class Runner(object):
 
         self.merged_map = merged_map or {}
 
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):  # type: (...) -> Generator[Any, None, None]
+        self.job_order = job_order
+        self._init_job(job_order, runtimeContext)
+        yield self
+
     def update_pipeline_component(self, record):
         pass
 
@@ -444,17 +765,17 @@ class Runner(object):
                                                        keep_client=self.arvrunner.keep_client,
                                                        num_retries=self.arvrunner.num_retries)
             if "cwl.output.json" in outc:
-                with outc.open("cwl.output.json") as f:
+                with outc.open("cwl.output.json", "rb") as f:
                     if f.size() > 0:
-                        outputs = json.load(f)
+                        outputs = json.loads(f.read().decode())
             def keepify(fileobj):
                 path = fileobj["location"]
                 if not path.startswith("keep:"):
                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
             adjustFileObjs(outputs, keepify)
             adjustDirObjs(outputs, keepify)
-        except Exception as e:
-            logger.exception("[%s] While getting final output object: %s", self.name, e)
+        except Exception:
+            logger.exception("[%s] While getting final output object", self.name)
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)