Merge branch '19109-upload-secondary' refs #19109
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index ea4ecf2b6d78cdefababfa94b923a5f4183338d5..f232178c5d5400ab90aad300d2f1d2d70909b98d 100644 (file)
@@ -31,8 +31,7 @@ import cwltool.workflow
 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
@@ -40,8 +39,9 @@ from cwltool.builder import Builder
 import schema_salad.validate as validate
 
 import arvados.collection
 import schema_salad.validate as validate
 
 import arvados.collection
+import arvados.util
 from .util import collectionUUID
 from .util import collectionUUID
-import ruamel.yaml as yaml
+from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
@@ -83,7 +83,7 @@ def find_defaults(d, op):
             for i in viewvalues(d):
                 find_defaults(i, op)
 
             for i in viewvalues(d):
                 find_defaults(i, op)
 
-def make_builder(joborder, hints, requirements, runtimeContext):
+def make_builder(joborder, hints, requirements, runtimeContext, metadata):
     return Builder(
                  job=joborder,
                  files=[],               # type: List[Dict[Text, Text]]
     return Builder(
                  job=joborder,
                  files=[],               # type: List[Dict[Text, Text]]
@@ -106,6 +106,8 @@ def make_builder(joborder, hints, requirements, runtimeContext):
                  outdir="",              # type: Text
                  tmpdir="",              # type: Text
                  stagedir="",            # type: Text
                  outdir="",              # type: Text
                  tmpdir="",              # type: Text
                  stagedir="",            # type: Text
+                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
+                 container_engine="docker"
                 )
 
 def search_schemadef(name, reqs):
                 )
 
 def search_schemadef(name, reqs):
@@ -127,6 +129,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
         return
 
             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
         return
 
+    if inputschema == "File":
+        inputschema = {"type": "File"}
+
     if isinstance(inputschema, basestring):
         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
         if sd:
     if isinstance(inputschema, basestring):
         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
         if sd:
@@ -162,28 +167,86 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
 
     elif (inputschema["type"] == "File" and
             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
 
     elif (inputschema["type"] == "File" and
-          secondaryspec and
           isinstance(primary, Mapping) and
           isinstance(primary, Mapping) and
-          primary.get("class") == "File" and
-          "secondaryFiles" not in primary):
+          primary.get("class") == "File"):
+
+        if "secondaryFiles" in primary or not secondaryspec:
+            # Nothing to do.
+            return
+
         #
         # Found a file, check for secondaryFiles
         #
         #
         # Found a file, check for secondaryFiles
         #
-        primary["secondaryFiles"] = []
+        specs = []
+        primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
         for i, sf in enumerate(aslist(secondaryspec)):
-            pattern = builder.do_eval(sf["pattern"], context=primary)
+            if builder.cwlVersion == "v1.0":
+                pattern = sf
+            else:
+                pattern = sf["pattern"]
             if pattern is None:
                 continue
             if pattern is None:
                 continue
-            sfpath = substitute(primary["location"], pattern)
-            required = builder.do_eval(sf.get("required"), context=primary)
+            if isinstance(pattern, list):
+                specs.extend(pattern)
+            elif isinstance(pattern, dict):
+                specs.append(pattern)
+            elif isinstance(pattern, str):
+                if builder.cwlVersion == "v1.0":
+                    specs.append({"pattern": pattern, "required": True})
+                else:
+                    specs.append({"pattern": pattern, "required": sf.get("required")})
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+        found = []
+        for i, sf in enumerate(specs):
+            if isinstance(sf, dict):
+                if sf.get("class") == "File":
+                    pattern = None
+                    if sf.get("location") is None:
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "File object is missing 'location': %s" % sf)
+                    sfpath = sf["location"]
+                    required = True
+                else:
+                    pattern = sf["pattern"]
+                    required = sf.get("required")
+            elif isinstance(sf, str):
+                pattern = sf
+                required = True
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+            if pattern is not None:
+                if "${" in pattern or "$(" in pattern:
+                    sfname = builder.do_eval(pattern, context=primary)
+                else:
+                    sfname = substitute(primary["basename"], pattern)
+
+                if sfname is None:
+                    continue
+
+                p_location = primary["location"]
+                if "/" in p_location:
+                    sfpath = (
+                        p_location[0 : p_location.rindex("/") + 1]
+                        + sfname
+                    )
+
+            required = builder.do_eval(required, context=primary)
 
             if fsaccess.exists(sfpath):
 
             if fsaccess.exists(sfpath):
-                primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+                if pattern is not None:
+                    found.append({"location": sfpath, "class": "File"})
+                else:
+                    found.append(sf)
             elif required:
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Required secondary file '%s' does not exist" % sfpath)
 
             elif required:
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Required secondary file '%s' does not exist" % sfpath)
 
-        primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+        primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
     elif inputschema["type"] not in primitive_types_set:
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
     elif inputschema["type"] not in primitive_types_set:
@@ -196,7 +259,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run,
+                        workflowobj, uri, loadref_run, runtimeContext,
                         include_primary=True, discovered_secondaryfiles=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
                         include_primary=True, discovered_secondaryfiles=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
@@ -222,7 +285,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
-            return yaml.safe_load(textIO)
+            yamlloader = YAML(typ='safe', pure=True)
+            return yamlloader.load(textIO)
         else:
             return {}
 
         else:
             return {}
 
@@ -237,10 +301,21 @@ def upload_dependencies(arvrunner, name, document_loader,
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
+    metadata = scanobj
+
     sc_result = scandeps(uri, scanobj,
     sc_result = scandeps(uri, scanobj,
-                  loadref_fields,
-                  set(("$include", "$schemas", "location")),
-                  loadref, urljoin=document_loader.fetcher.urljoin)
+                         loadref_fields,
+                         set(("$include", "location")),
+                         loadref, urljoin=document_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    optional_deps = scandeps(uri, scanobj,
+                                  loadref_fields,
+                                  set(("$schemas",)),
+                                  loadref, urljoin=document_loader.fetcher.urljoin,
+                                  nestdirs=False)
+
+    sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
 
     sc = []
     uuids = {}
@@ -298,24 +373,14 @@ def upload_dependencies(arvrunner, name, document_loader,
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
-
     def visit_default(obj):
     def visit_default(obj):
-        remove = [False]
-        def ensure_default_location(f):
+        def defaults_are_optional(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
-        visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+            normalizeFilesDirs(f)
+            optional_deps.append(f)
+        visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
 
     find_defaults(workflowobj, visit_default)
 
 
     find_defaults(workflowobj, visit_default)
 
@@ -328,7 +393,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         builder = make_builder(builder_job_order,
                                obj.get("hints", []),
                                obj.get("requirements", []),
         builder = make_builder(builder_job_order,
                                obj.get("hints", []),
                                obj.get("requirements", []),
-                               ArvRuntimeContext())
+                               ArvRuntimeContext(),
+                               metadata)
         discover_secondary_files(arvrunner.fs_access,
                                  builder,
                                  obj["inputs"],
         discover_secondary_files(arvrunner.fs_access,
                                  builder,
                                  obj["inputs"],
@@ -350,12 +416,19 @@ def upload_dependencies(arvrunner, name, document_loader,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
-                           single_collection=True)
+                           single_collection=True,
+                           optional_deps=optional_deps)
+
+    keeprefs = set()
+    def addkeepref(k):
+        if k.startswith("keep:"):
+            keeprefs.add(collection_pdh_pattern.match(k).group(1))
 
     def setloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
 
     def setloc(p):
         loc = p.get("location")
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
+            addkeepref(p["location"])
             return
 
         if not loc:
             return
 
         if not loc:
@@ -377,7 +450,10 @@ def upload_dependencies(arvrunner, name, document_loader,
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
 
         gp = collection_uuid_pattern.match(loc)
         if not gp:
+            # Not a uuid pattern (must be a pdh pattern)
+            addkeepref(p["location"])
             return
             return
+
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
             raise SourceLine(p, "location", validate.ValidationException).makeError(
@@ -392,34 +468,78 @@ def upload_dependencies(arvrunner, name, document_loader,
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
+    if runtimeContext.copy_deps:
+        # Find referenced collections and copy them into the
+        # destination project, for easy sharing.
+        already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+                                     filters=[["portable_data_hash", "in", list(keeprefs)],
+                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
+                                     select=["uuid", "portable_data_hash", "created_at"]))
+
+        keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+        for kr in keeprefs:
+            col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+                                                  order="created_at desc",
+                                                   select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+                                                   limit=1).execute()
+            if len(col["items"]) == 0:
+                logger.warning("Cannot find collection with portable data hash %s", kr)
+                continue
+            col = col["items"][0]
+            try:
+                arvrunner.api.collections().create(body={"collection": {
+                    "owner_uuid": runtimeContext.project_uuid,
+                    "name": col["name"],
+                    "description": col["description"],
+                    "properties": col["properties"],
+                    "portable_data_hash": col["portable_data_hash"],
+                    "manifest_text": col["manifest_text"],
+                    "storage_classes_desired": col["storage_classes_desired"],
+                    "trash_at": col["trash_at"]
+                }}, ensure_unique_name=True).execute()
+            except Exception as e:
+                logger.warning("Unable copy collection to destination: %s", e)
+
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
-            sch.append(mapper.mapper(s).resolved)
+            if s in mapper:
+                sch.append(mapper.mapper(s).resolved)
         workflowobj["$schemas"] = sch
 
     return mapper
 
 
         workflowobj["$schemas"] = sch
 
     return mapper
 
 
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
-                # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
         else:
         else:
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
+                                                       True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
+            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
 
 
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -433,19 +553,26 @@ def packed_workflow(arvrunner, tool, merged_map):
 
     def visit(v, cur_id):
         if isinstance(v, dict):
 
     def visit(v, cur_id):
         if isinstance(v, dict):
-            if v.get("class") in ("CommandLineTool", "Workflow"):
-                if "id" not in v:
-                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
-                cur_id = rewrite_to_orig.get(v["id"], v["id"])
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
+                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
+                if "id" in v:
+                    cur_id = rewrite_to_orig.get(v["id"], v["id"])
             if "path" in v and "location" not in v:
                 v["location"] = v["path"]
                 del v["path"]
             if "path" in v and "location" not in v:
                 v["location"] = v["path"]
                 del v["path"]
-            if "location" in v and not v["location"].startswith("keep:"):
-                v["location"] = merged_map[cur_id].resolved[v["location"]]
-            if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
-                v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
             if v.get("class") == "DockerRequirement":
-                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
+                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+                                                                                                             runtimeContext.project_uuid,
+                                                                                                             runtimeContext.force_docker_pull,
+                                                                                                             runtimeContext.tmp_outdir_prefix,
+                                                                                                             runtimeContext.match_local_docker,
+                                                                                                             runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -466,7 +593,7 @@ def tag_git_version(packed):
             packed["http://schema.org/version"] = githash
 
 
             packed["http://schema.org/version"] = githash
 
 
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
@@ -489,7 +616,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
     builder = make_builder(builder_job_order,
                            tool.hints,
                            tool.requirements,
     builder = make_builder(builder_job_order,
                            tool.hints,
                            tool.requirements,
-                           ArvRuntimeContext())
+                           ArvRuntimeContext(),
+                           tool.metadata)
     # Now update job_order with secondaryFiles
     discover_secondary_files(arvrunner.fs_access,
                              builder,
     # Now update job_order with secondaryFiles
     discover_secondary_files(arvrunner.fs_access,
                              builder,
@@ -501,7 +629,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False)
+                                    False,
+                                    runtimeContext)
 
     if "id" in job_order:
         del job_order["id"]
 
     if "id" in job_order:
         del job_order["id"]
@@ -515,10 +644,10 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool)
+    upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
 
     document_loader = tool.doc_loader
 
@@ -533,6 +662,7 @@ def upload_workflow_deps(arvrunner, tool):
                                      deptool,
                                      deptool["id"],
                                      False,
                                      deptool,
                                      deptool["id"],
                                      False,
+                                     runtimeContext,
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
@@ -545,16 +675,22 @@ def upload_workflow_deps(arvrunner, tool):
 
     return merged_map
 
 
     return merged_map
 
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+                                                          True,
+                                                          runtimeContext.project_uuid,
+                                                          runtimeContext.force_docker_pull,
+                                                          runtimeContext.tmp_outdir_prefix,
+                                                          runtimeContext.match_local_docker,
+                                                          runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
 
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
     collection = arvados.collection.Collection(api_client=arvrunner.api,
                                                keep_client=arvrunner.keep_client,
                                                num_retries=arvrunner.num_retries)
@@ -563,15 +699,15 @@ def upload_workflow_collection(arvrunner, name, packed):
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
 
     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
                ["name", "like", name+"%"]]
-    if arvrunner.project_uuid:
-        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    if runtimeContext.project_uuid:
+        filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
 
     if exists["items"]:
         logger.info("Using collection %s", exists["items"][0]["uuid"])
     else:
         collection.save_new(name=name,
-                            owner_uuid=arvrunner.project_uuid,
+                            owner_uuid=runtimeContext.project_uuid,
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())
                             ensure_unique_name=True,
                             num_retries=arvrunner.num_retries)
         logger.info("Uploaded to %s", collection.manifest_locator())