Fix set_secondary recursion error
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 1d5f98f200c16bb444878da3b418f01b57bf1002..644713bce25385938df289dbdcb4cf68b77f3ca5 100644 (file)
@@ -2,34 +2,53 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
 #
 # SPDX-License-Identifier: Apache-2.0
 
+from future import standard_library
+standard_library.install_aliases()
+from future.utils import  viewvalues, viewitems
+from past.builtins import basestring
+
 import os
 import os
-import urlparse
+import sys
+import re
+import urllib.parse
 from functools import partial
 import logging
 import json
 from functools import partial
 import logging
 import json
-import subprocess32 as subprocess
+import copy
 from collections import namedtuple
 from collections import namedtuple
+from io import StringIO
+from typing import Mapping, Sequence
 
 
-from StringIO import StringIO
+if os.name == "posix" and sys.version_info[0] < 3:
+    import subprocess32 as subprocess
+else:
+    import subprocess
 
 from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
 
 from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+                             shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.builder import substitute
 from cwltool.pack import pack
+from cwltool.update import INTERNAL_VERSION
+from cwltool.builder import Builder
+import schema_salad.validate as validate
 
 import arvados.collection
 
 import arvados.collection
-import ruamel.yaml as yaml
+import arvados.util
+from .util import collectionUUID
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
 
 import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
 from ._version import __version__
 from . import done
 from ._version import __version__
 from . import done
+from . context import ArvRuntimeContext
 
 logger = logging.getLogger('arvados.cwl-runner')
 
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -61,27 +80,186 @@ def find_defaults(d, op):
         if "default" in d:
             op(d)
         else:
         if "default" in d:
             op(d)
         else:
-            for i in d.itervalues():
+            for i in viewvalues(d):
                 find_defaults(i, op)
 
                 find_defaults(i, op)
 
-def setSecondary(t, fileobj, discovered):
-    if isinstance(fileobj, dict) and fileobj.get("class") == "File":
-        if "secondaryFiles" not in fileobj:
-            fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
-            if discovered is not None:
-                discovered[fileobj["location"]] = fileobj["secondaryFiles"]
-    elif isinstance(fileobj, list):
-        for e in fileobj:
-            setSecondary(t, e, discovered)
+def make_builder(joborder, hints, requirements, runtimeContext, metadata):
+    return Builder(
+                 job=joborder,
+                 files=[],               # type: List[Dict[Text, Text]]
+                 bindings=[],            # type: List[Dict[Text, Any]]
+                 schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
+                 names=None,               # type: Names
+                 requirements=requirements,        # type: List[Dict[Text, Any]]
+                 hints=hints,               # type: List[Dict[Text, Any]]
+                 resources={},           # type: Dict[str, int]
+                 mutation_manager=None,    # type: Optional[MutationManager]
+                 formatgraph=None,         # type: Optional[Graph]
+                 make_fs_access=None,      # type: Type[StdFsAccess]
+                 fs_access=None,           # type: StdFsAccess
+                 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
+                 timeout=runtimeContext.eval_timeout,             # type: float
+                 debug=runtimeContext.debug,               # type: bool
+                 js_console=runtimeContext.js_console,          # type: bool
+                 force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
+                 loadListing="",         # type: Text
+                 outdir="",              # type: Text
+                 tmpdir="",              # type: Text
+                 stagedir="",            # type: Text
+                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
+                 container_engine="docker"
+                )
+
+def search_schemadef(name, reqs):
+    for r in reqs:
+        if r["class"] == "SchemaDefRequirement":
+            for sd in r["types"]:
+                if sd["name"] == name:
+                    return sd
+    return None
+
+primitive_types_set = frozenset(("null", "boolean", "int", "long",
+                                 "float", "double", "string", "record",
+                                 "array", "enum"))
+
+def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
+    if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
+        # union type, collect all possible secondaryFiles
+        for i in inputschema:
+            set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
+        return
+
+    if inputschema == "File":
+        inputschema = {"type": "File"}
+
+    if isinstance(inputschema, basestring):
+        sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
+        if sd:
+            inputschema = sd
+        else:
+            return
+
+    if "secondaryFiles" in inputschema:
+        # set secondaryFiles, may be inherited by compound types.
+        secondaryspec = inputschema["secondaryFiles"]
+
+    if (isinstance(inputschema["type"], (Mapping, Sequence)) and
+        not isinstance(inputschema["type"], basestring)):
+        # compound type (union, array, record)
+        set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+    elif (inputschema["type"] == "record" and
+          isinstance(primary, Mapping)):
+        #
+        # record type, find secondary files associated with fields.
+        #
+        for f in inputschema["fields"]:
+            p = primary.get(shortname(f["name"]))
+            if p:
+                set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
+
+    elif (inputschema["type"] == "array" and
+          isinstance(primary, Sequence)):
+        #
+        # array type, find secondary files of elements
+        #
+        for p in primary:
+            set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
+
+    elif (inputschema["type"] == "File" and
+          isinstance(primary, Mapping) and
+          primary.get("class") == "File"):
+
+        if "secondaryFiles" in primary or not secondaryspec:
+            # Nothing to do.
+            return
+
+        #
+        # Found a file, check for secondaryFiles
+        #
+        specs = []
+        primary["secondaryFiles"] = secondaryspec
+        for i, sf in enumerate(aslist(secondaryspec)):
+            if builder.cwlVersion == "v1.0":
+                pattern = sf
+            else:
+                pattern = sf["pattern"]
+            if pattern is None:
+                continue
+            if isinstance(pattern, list):
+                specs.extend(pattern)
+            elif isinstance(pattern, dict):
+                specs.append(pattern)
+            elif isinstance(pattern, str):
+                if builder.cwlVersion == "v1.0":
+                    specs.append({"pattern": pattern, "required": True})
+                else:
+                    specs.append({"pattern": pattern, "required": sf.get("required")})
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+        found = []
+        for i, sf in enumerate(specs):
+            if isinstance(sf, dict):
+                if sf.get("class") == "File":
+                    pattern = None
+                    if sf.get("location") is None:
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "File object is missing 'location': %s" % sf)
+                    sfpath = sf["location"]
+                    required = True
+                else:
+                    pattern = sf["pattern"]
+                    required = sf.get("required")
+            elif isinstance(sf, str):
+                pattern = sf
+                required = True
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
 
 
-def discover_secondary_files(inputs, job_order, discovered=None):
-    for t in inputs:
-        if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
-            setSecondary(t, job_order[shortname(t["id"])], discovered)
+            if pattern is not None:
+                if "${" in pattern or "$(" in pattern:
+                    sfname = builder.do_eval(pattern, context=primary)
+                else:
+                    sfname = substitute(primary["basename"], pattern)
+
+                if sfname is None:
+                    continue
+
+                p_location = primary["location"]
+                if "/" in p_location:
+                    sfpath = (
+                        p_location[0 : p_location.rindex("/") + 1]
+                        + sfname
+                    )
 
 
+            required = builder.do_eval(required, context=primary)
+
+            if fsaccess.exists(sfpath):
+                if pattern is not None:
+                    found.append({"location": sfpath, "class": "File"})
+                else:
+                    found.append(sf)
+            elif required:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Required secondary file '%s' does not exist" % sfpath)
+
+        primary["secondaryFiles"] = cmap(found)
+        if discovered is not None:
+            discovered[primary["location"]] = primary["secondaryFiles"]
+    elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
+        set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
+    for inputschema in inputs:
+        primary = job_order.get(shortname(inputschema["id"]))
+        if isinstance(primary, (Mapping, Sequence)):
+            set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run,
+                        workflowobj, uri, loadref_run, runtimeContext,
                         include_primary=True, discovered_secondaryfiles=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
                         include_primary=True, discovered_secondaryfiles=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
@@ -98,7 +276,7 @@ def upload_dependencies(arvrunner, name, document_loader,
     loaded = set()
     def loadref(b, u):
         joined = document_loader.fetcher.urljoin(b, u)
     loaded = set()
     def loadref(b, u):
         joined = document_loader.fetcher.urljoin(b, u)
-        defrg, _ = urlparse.urldefrag(joined)
+        defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
             # Use fetch_text to get raw file (before preprocessing).
         if defrg not in loaded:
             loaded.add(defrg)
             # Use fetch_text to get raw file (before preprocessing).
@@ -107,7 +285,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
-            return yaml.safe_load(textIO)
+            yamlloader = YAML(typ='safe', pure=True)
+            return yamlloader.load(textIO)
         else:
             return {}
 
         else:
             return {}
 
@@ -117,61 +296,115 @@ def upload_dependencies(arvrunner, name, document_loader,
         loadref_fields = set(("$import",))
 
     scanobj = workflowobj
         loadref_fields = set(("$import",))
 
     scanobj = workflowobj
-    if "id" in workflowobj:
+    if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
         # Need raw file content (before preprocessing) to ensure
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
         # Need raw file content (before preprocessing) to ensure
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
+    metadata = scanobj
+
     sc_result = scandeps(uri, scanobj,
     sc_result = scandeps(uri, scanobj,
-                  loadref_fields,
-                  set(("$include", "$schemas", "location")),
-                  loadref, urljoin=document_loader.fetcher.urljoin)
+                         loadref_fields,
+                         set(("$include", "location")),
+                         loadref, urljoin=document_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    optional_deps = scandeps(uri, scanobj,
+                                  loadref_fields,
+                                  set(("$schemas",)),
+                                  loadref, urljoin=document_loader.fetcher.urljoin,
+                                  nestdirs=False)
+
+    sc_result.extend(optional_deps)
 
     sc = []
 
     sc = []
-    def only_real(obj):
-        # Only interested in local files than need to be uploaded,
-        # don't include file literals, keep references, etc.
-        sp = obj.get("location", "").split(":")
-        if len(sp) > 1 and sp[0] in ("file", "http", "https"):
+    uuids = {}
+
+    def collect_uuids(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if sp[0] == "keep":
+            # Collect collection uuids that need to be resolved to
+            # portable data hashes
+            gp = collection_uuid_pattern.match(loc)
+            if gp:
+                uuids[gp.groups()[0]] = obj
+            if collectionUUID in obj:
+                uuids[obj[collectionUUID]] = obj
+
+    def collect_uploads(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if len(sp) < 1:
+            return
+        if sp[0] in ("file", "http", "https"):
+            # Record local files than need to be uploaded,
+            # don't include file literals, keep references, etc.
             sc.append(obj)
             sc.append(obj)
+        collect_uuids(obj)
+
+    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+
+    # Resolve any collection uuids we found to portable data hashes
+    # and assign them to uuid_map
+    uuid_map = {}
+    fetch_uuids = list(uuids.keys())
+    while fetch_uuids:
+        # For a large number of fetch_uuids, API server may limit
+        # response size, so keep fetching from API server has nothing
+        # more to give us.
+        lookups = arvrunner.api.collections().list(
+            filters=[["uuid", "in", fetch_uuids]],
+            count="none",
+            select=["uuid", "portable_data_hash"]).execute(
+                num_retries=arvrunner.num_retries)
 
 
-    visit_class(sc_result, ("File", "Directory"), only_real)
+        if not lookups["items"]:
+            break
+
+        for l in lookups["items"]:
+            uuid_map[l["uuid"]] = l["portable_data_hash"]
+
+        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
 
     normalizeFilesDirs(sc)
 
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
-
     def visit_default(obj):
     def visit_default(obj):
-        remove = [False]
-        def ensure_default_location(f):
+        def defaults_are_optional(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
-        visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+            normalizeFilesDirs(f)
+            optional_deps.append(f)
+        visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
 
     find_defaults(workflowobj, visit_default)
 
     discovered = {}
     def discover_default_secondary_files(obj):
 
     find_defaults(workflowobj, visit_default)
 
     discovered = {}
     def discover_default_secondary_files(obj):
-        discover_secondary_files(obj["inputs"],
-                                 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
+        builder_job_order = {}
+        for t in obj["inputs"]:
+            builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
+        # Need to create a builder object to evaluate expressions.
+        builder = make_builder(builder_job_order,
+                               obj.get("hints", []),
+                               obj.get("requirements", []),
+                               ArvRuntimeContext(),
+                               metadata)
+        discover_secondary_files(arvrunner.fs_access,
+                                 builder,
+                                 obj["inputs"],
+                                 builder_job_order,
                                  discovered)
 
                                  discovered)
 
-    visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
+    copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
+    visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
 
 
-    for d in list(discovered.keys()):
+    for d in list(discovered):
         # Only interested in discovered secondaryFiles which are local
         # files that need to be uploaded.
         if d.startswith("file:"):
         # Only interested in discovered secondaryFiles which are local
         # files that need to be uploaded.
         if d.startswith("file:"):
@@ -183,11 +416,50 @@ def upload_dependencies(arvrunner, name, document_loader,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
-                           single_collection=True)
+                           single_collection=True,
+                           optional_deps=optional_deps)
+
+    keeprefs = set()
+    def addkeepref(k):
+        if k.startswith("keep:"):
+            keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
     def setloc(p):
 
     def setloc(p):
-        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+        loc = p.get("location")
+        if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
             p["location"] = mapper.mapper(p["location"]).resolved
+            addkeepref(p["location"])
+            return
+
+        if not loc:
+            return
+
+        if collectionUUID in p:
+            uuid = p[collectionUUID]
+            if uuid not in uuid_map:
+                raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                    "Collection uuid %s not found" % uuid)
+            gp = collection_pdh_pattern.match(loc)
+            if gp and uuid_map[uuid] != gp.groups()[0]:
+                # This file entry has both collectionUUID and a PDH
+                # location. If the PDH doesn't match the one returned
+                # the API server, raise an error.
+                raise SourceLine(p, "location", validate.ValidationException).makeError(
+                    "Expected collection uuid %s to be %s but API server reported %s" % (
+                        uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+        gp = collection_uuid_pattern.match(loc)
+        if not gp:
+            # Not a uuid pattern (must be a pdh pattern)
+            addkeepref(p["location"])
+            return
+
+        uuid = gp.groups()[0]
+        if uuid not in uuid_map:
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
+        p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+        p[collectionUUID] = uuid
 
     visit_class(workflowobj, ("File", "Directory"), setloc)
     visit_class(discovered, ("File", "Directory"), setloc)
 
     visit_class(workflowobj, ("File", "Directory"), setloc)
     visit_class(discovered, ("File", "Directory"), setloc)
@@ -196,56 +468,111 @@ def upload_dependencies(arvrunner, name, document_loader,
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
+    if runtimeContext.copy_deps:
+        # Find referenced collections and copy them into the
+        # destination project, for easy sharing.
+        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+                                     filters=[["portable_data_hash", "in", list(keeprefs)],
+                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
+                                     select=["uuid", "portable_data_hash", "created_at"]))
+
+        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+        for kr in keeprefs:
+            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+                                                  order="created_at desc",
+                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+                                                   limit=1).execute()
+            if len(col["items"]) == 0:
+                logger.warning("Cannot find collection with portable data hash %s", kr)
+                continue
+            col = col["items"][0]
+            try:
+                arvrunner.api.collections().create(body={"collection": {
+                    "owner_uuid": runtimeContext.project_uuid,
+                    "name": col["name"],
+                    "description": col["description"],
+                    "properties": col["properties"],
+                    "portable_data_hash": col["portable_data_hash"],
+                    "manifest_text": col["manifest_text"],
+                    "storage_classes_desired": col["storage_classes_desired"],
+                    "trash_at": col["trash_at"]
+                }}, ensure_unique_name=True).execute()
+            except Exception as e:
+                logger.warning("Unable copy collection to destination: %s", e)
+
     if "$schemas" in workflowobj:
     if "$schemas" in workflowobj:
-        sch = []
+        sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
         for s in workflowobj["$schemas"]:
-            sch.append(mapper.mapper(s).resolved)
+            if s in mapper:
+                sch.append(mapper.mapper(s).resolved)
         workflowobj["$schemas"] = sch
 
     return mapper
 
 
         workflowobj["$schemas"] = sch
 
     return mapper
 
 
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
-                # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
         else:
         else:
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
+                                                       True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
+            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
 
 
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
 
     rewrites = {}
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
 
     rewrites = {}
-    packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
-                  tool.tool["id"], tool.metadata, rewrite_out=rewrites)
+    packed = pack(arvrunner.loadingContext, tool.tool["id"],
+                  rewrite_out=rewrites,
+                  loader=tool.doc_loader)
 
 
-    rewrite_to_orig = {v: k for k,v in rewrites.items()}
+    rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
 
     def visit(v, cur_id):
         if isinstance(v, dict):
 
     def visit(v, cur_id):
         if isinstance(v, dict):
-            if v.get("class") in ("CommandLineTool", "Workflow"):
-                if "id" not in v:
-                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
-                cur_id = rewrite_to_orig.get(v["id"], v["id"])
-            if "location" in v and not v["location"].startswith("keep:"):
-                v["location"] = merged_map[cur_id].resolved[v["location"]]
-            if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
-                v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
+                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
+                if "id" in v:
+                    cur_id = rewrite_to_orig.get(v["id"], v["id"])
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
             if v.get("class") == "DockerRequirement":
-                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
+                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+                                                                                                             runtimeContext.project_uuid,
+                                                                                                             runtimeContext.force_docker_pull,
+                                                                                                             runtimeContext.tmp_outdir_prefix,
+                                                                                                             runtimeContext.match_local_docker,
+                                                                                                             runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -266,19 +593,44 @@ def tag_git_version(packed):
             packed["http://schema.org/version"] = githash
 
 
             packed["http://schema.org/version"] = githash
 
 
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
 
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
 
-    discover_secondary_files(tool.tool["inputs"], job_order)
+    # Make a copy of the job order and set defaults.
+    builder_job_order = copy.copy(job_order)
+
+    # fill_in_defaults throws an error if there are any
+    # missing required parameters, we don't want it to do that
+    # so make them all optional.
+    inputs_copy = copy.deepcopy(tool.tool["inputs"])
+    for i in inputs_copy:
+        if "null" not in i["type"]:
+            i["type"] = ["null"] + aslist(i["type"])
+
+    fill_in_defaults(inputs_copy,
+                     builder_job_order,
+                     arvrunner.fs_access)
+    # Need to create a builder object to evaluate expressions.
+    builder = make_builder(builder_job_order,
+                           tool.hints,
+                           tool.requirements,
+                           ArvRuntimeContext(),
+                           tool.metadata)
+    # Now update job_order with secondaryFiles
+    discover_secondary_files(arvrunner.fs_access,
+                             builder,
+                             tool.tool["inputs"],
+                             job_order)
 
     jobmapper = upload_dependencies(arvrunner,
                                     name,
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
 
     jobmapper = upload_dependencies(arvrunner,
                                     name,
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False)
+                                    False,
+                                    runtimeContext)
 
     if "id" in job_order:
         del job_order["id"]
 
     if "id" in job_order:
         del job_order["id"]
@@ -292,10 +644,10 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool)
+    upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
 
     document_loader = tool.doc_loader
 
@@ -310,6 +662,7 @@ def upload_workflow_deps(arvrunner, tool):
                                      deptool,
                                      deptool["id"],
                                      False,
                                      deptool,
                                      deptool["id"],
                                      False,
+                                     runtimeContext,
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
@@ -322,16 +675,22 @@ def upload_workflow_deps(arvrunner, tool):
 
     return merged_map
 
 
     return merged_map
 
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+                                                          True,
+                                                          runtimeContext.project_uuid,
+                                                          runtimeContext.force_docker_pull,
+                                                          runtimeContext.tmp_outdir_prefix,
+                                                          runtimeContext.match_local_docker,
+                                                          runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
@@ -340,15 +699,15 @@ def upload_workflow_collection(arvrunner, name, packed):
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
-    if arvrunner.project_uuid:
-        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    if runtimeContext.project_uuid:
+        filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
-                            owner_uuid=arvrunner.project_uuid,
+                            owner_uuid=runtimeContext.project_uuid,
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
@@ -360,7 +719,8 @@ class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, tool, loadingContext, enable_reuse,
+    def __init__(self, runner, updated_tool,
+                 tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
@@ -368,7 +728,10 @@ class Runner(Process):
                  collection_cache_size=256,
                  collection_cache_is_default=True):
 
                  collection_cache_size=256,
                  collection_cache_is_default=True):
 
-        super(Runner, self).__init__(tool.tool, loadingContext)
+        loadingContext = loadingContext.copy()
+        loadingContext.metadata = updated_tool.metadata.copy()
+
+        super(Runner, self).__init__(updated_tool.tool, loadingContext)
 
         self.arvrunner = runner
         self.embedded_tool = tool
 
         self.arvrunner = runner
         self.embedded_tool = tool
@@ -391,6 +754,7 @@ class Runner(Process):
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
+        self.enable_dev = loadingContext.enable_dev
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
@@ -461,17 +825,17 @@ class Runner(Process):
                                                        keep_client=self.arvrunner.keep_client,
                                                        num_retries=self.arvrunner.num_retries)
             if "cwl.output.json" in outc:
                                                        keep_client=self.arvrunner.keep_client,
                                                        num_retries=self.arvrunner.num_retries)
             if "cwl.output.json" in outc:
-                with outc.open("cwl.output.json") as f:
+                with outc.open("cwl.output.json", "rb") as f:
                     if f.size() > 0:
                     if f.size() > 0:
-                        outputs = json.load(f)
+                        outputs = json.loads(f.read().decode())
             def keepify(fileobj):
                 path = fileobj["location"]
                 if not path.startswith("keep:"):
                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
             adjustFileObjs(outputs, keepify)
             adjustDirObjs(outputs, keepify)
             def keepify(fileobj):
                 path = fileobj["location"]
                 if not path.startswith("keep:"):
                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
             adjustFileObjs(outputs, keepify)
             adjustDirObjs(outputs, keepify)
-        except Exception as e:
-            logger.exception("[%s] While getting final output object: %s", self.name, e)
+        except Exception:
+            logger.exception("[%s] While getting final output object", self.name)
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)