19070: Prefer to use the passed-in runtimeContext
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 11 May 2022 21:19:51 +0000 (17:19 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Wed, 11 May 2022 21:27:13 +0000 (17:27 -0400)
Code cleanup, change most places to use the passed-in runtimeContext
instead of the ArvRunner top level runtimeContext.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_submit.py

index 3faa510a0a854cc2b1d713345a745a0eac21d677..21b629f37ab9b50812415ecdad4a766345b28f32 100644 (file)
@@ -219,7 +219,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
-    exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave depenencies where they are.")
+    exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
 
     parser.add_argument(
         "--skip-schemas",
 
     parser.add_argument(
         "--skip-schemas",
@@ -367,5 +367,5 @@ def main(args=sys.argv[1:],
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints,
                              loadingContext=executor.loadingContext,
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints,
                              loadingContext=executor.loadingContext,
-                             runtimeContext=executor.runtimeContext,
+                             runtimeContext=executor.toplevel_runtimeContext,
                              input_required=not (arvargs.create_workflow or arvargs.update_workflow))
                              input_required=not (arvargs.create_workflow or arvargs.update_workflow))
index 33b4c90c617711f6c2505bda1be35e93ed74137f..5082cc2f4b57eacd0934019099509c2f42c7493b 100644 (file)
@@ -466,7 +466,7 @@ class RunnerContainer(Runner):
             "cwd": "/var/spool/cwl",
             "priority": self.priority,
             "state": "Committed",
             "cwd": "/var/spool/cwl",
             "priority": self.priority,
             "state": "Committed",
-            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
                     "kind": "json",
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
                     "kind": "json",
@@ -501,7 +501,7 @@ class RunnerContainer(Runner):
                 "portable_data_hash": "%s" % workflowcollection
             }
         else:
                 "portable_data_hash": "%s" % workflowcollection
             }
         else:
-            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
@@ -551,17 +551,17 @@ class RunnerContainer(Runner):
         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
 
         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
 
-        if self.on_error:
+        if runtimeContext.on_error:
             command.append("--on-error=" + self.on_error)
 
             command.append("--on-error=" + self.on_error)
 
-        if self.intermediate_output_ttl:
-            command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+        if runtimeContext.intermediate_output_ttl:
+            command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
 
 
-        if self.arvrunner.trash_intermediate:
+        if runtimeContext.trash_intermediate:
             command.append("--trash-intermediate")
 
             command.append("--trash-intermediate")
 
-        if self.arvrunner.project_uuid:
-            command.append("--project-uuid="+self.arvrunner.project_uuid)
+        if runtimeContext.project_uuid:
+            command.append("--project-uuid="+runtimeContext.project_uuid)
 
         if self.enable_dev:
             command.append("--enable-dev")
 
         if self.enable_dev:
             command.append("--enable-dev")
@@ -582,8 +582,8 @@ class RunnerContainer(Runner):
     def run(self, runtimeContext):
         runtimeContext.keepprefix = "keep:"
         job_spec = self.arvados_job_spec(runtimeContext)
     def run(self, runtimeContext):
         runtimeContext.keepprefix = "keep:"
         job_spec = self.arvados_job_spec(runtimeContext)
-        if self.arvrunner.project_uuid:
-            job_spec["owner_uuid"] = self.arvrunner.project_uuid
+        if runtimeContext.project_uuid:
+            job_spec["owner_uuid"] = runtimeContext.project_uuid
 
         extra_submit_params = {}
         if runtimeContext.submit_runner_cluster:
 
         extra_submit_params = {}
         if runtimeContext.submit_runner_cluster:
index 4fe82a6fe1d6fc32f709dd909577da7010970e07..8eb12913adb854894cc19d4a3dd8cca862ae9dc1 100644 (file)
@@ -37,11 +37,12 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
 sum_res_pars = ("outdirMin", "outdirMax")
 
 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
 sum_res_pars = ("outdirMin", "outdirMax")
 
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+                    runtimeContext, uuid=None,
                     submit_runner_ram=0, name=None, merged_map=None,
                     submit_runner_image=None):
 
                     submit_runner_ram=0, name=None, merged_map=None,
                     submit_runner_image=None):
 
-    packed = packed_workflow(arvRunner, tool, merged_map)
+    packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
 
     adjustDirObjs(job_order, trim_listing)
     adjustFileObjs(job_order, trim_anonymous_location)
 
     adjustDirObjs(job_order, trim_listing)
     adjustFileObjs(job_order, trim_anonymous_location)
@@ -57,7 +58,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
 
     upload_dependencies(arvRunner, name, tool.doc_loader,
         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
 
     upload_dependencies(arvRunner, name, tool.doc_loader,
-                        packed, tool.tool["id"], False)
+                        packed, tool.tool["id"], False,
+                        runtimeContext)
 
     wf_runner_resources = None
 
 
     wf_runner_resources = None
 
@@ -72,7 +74,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
         hints.append(wf_runner_resources)
 
         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
         hints.append(wf_runner_resources)
 
-    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+                                                                  submit_runner_image or "arvados/jobs:"+__version__,
+                                                                  runtimeContext)
 
     if submit_runner_ram:
         wf_runner_resources["ramMin"] = submit_runner_ram
 
     if submit_runner_ram:
         wf_runner_resources["ramMin"] = submit_runner_ram
@@ -194,7 +198,8 @@ class ArvadosWorkflow(Workflow):
                                 self.doc_loader,
                                 joborder,
                                 joborder.get("id", "#"),
                                 self.doc_loader,
                                 joborder,
                                 joborder.get("id", "#"),
-                                False)
+                                False,
+                                runtimeContext)
 
             if self.wf_pdh is None:
                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
 
             if self.wf_pdh is None:
                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
@@ -237,7 +242,8 @@ class ArvadosWorkflow(Workflow):
                                     self.doc_loader,
                                     packed,
                                     self.tool["id"],
                                     self.doc_loader,
                                     packed,
                                     self.tool["id"],
-                                    False)
+                                    False,
+                                    runtimeContext)
 
                 # Discover files/directories referenced by the
                 # workflow (mainly "default" values)
 
                 # Discover files/directories referenced by the
                 # workflow (mainly "default" values)
index 0daa15d5f0ac18aed8c7628f3b7483213b44fea1..70bc0c4572bbcf18c50006a61a85dea6b2d6957f 100644 (file)
@@ -197,11 +197,11 @@ The 'jobs' API is no longer supported.
             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
             root_logger.addHandler(handler)
 
             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
             root_logger.addHandler(handler)
 
-        self.runtimeContext = ArvRuntimeContext(vars(arvargs))
-        self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+        self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
+        self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
                                                      collection_cache=self.collection_cache)
 
                                                      collection_cache=self.collection_cache)
 
-        validate_cluster_target(self, self.runtimeContext)
+        validate_cluster_target(self, self.toplevel_runtimeContext)
 
 
     def arv_make_tool(self, toolpath_object, loadingContext):
 
 
     def arv_make_tool(self, toolpath_object, loadingContext):
@@ -535,6 +535,8 @@ The 'jobs' API is no longer supported.
         if runtimeContext.submit_request_uuid and self.work_api != "containers":
             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
 
         if runtimeContext.submit_request_uuid and self.work_api != "containers":
             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
 
+        runtimeContext = runtimeContext.copy()
+
         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
         if runtimeContext.storage_classes == "default":
             runtimeContext.storage_classes = default_storage_classes
         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
         if runtimeContext.storage_classes == "default":
             runtimeContext.storage_classes = default_storage_classes
@@ -544,14 +546,14 @@ The 'jobs' API is no longer supported.
         if not runtimeContext.name:
             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
 
         if not runtimeContext.name:
             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
 
-        if self.runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+        if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
             # When creating or updating workflow record, by default
             # always copy dependencies and ensure Docker images are up
             # to date.
             # When creating or updating workflow record, by default
             # always copy dependencies and ensure Docker images are up
             # to date.
-            self.runtimeContext.copy_deps = True
-            self.runtimeContext.match_local_docker = True
+            runtimeContext.copy_deps = True
+            runtimeContext.match_local_docker = True
 
 
-        if self.runtimeContext.update_workflow and self.project_uuid is None:
+        if runtimeContext.update_workflow and self.project_uuid is None:
             # If we are updating a workflow, make sure anything that
             # gets uploaded goes into the same parent project, unless
             # an alternate --project-uuid was provided.
             # If we are updating a workflow, make sure anything that
             # gets uploaded goes into the same parent project, unless
             # an alternate --project-uuid was provided.
@@ -560,7 +562,7 @@ The 'jobs' API is no longer supported.
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
-                                     updated_tool, job_order)
+                                     updated_tool, job_order, runtimeContext)
 
         # the last clause means: if it is a command line tool, and we
         # are going to wait for the result, and always_submit_runner
 
         # the last clause means: if it is a command line tool, and we
         # are going to wait for the result, and always_submit_runner
@@ -587,7 +589,7 @@ The 'jobs' API is no longer supported.
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        merged_map = upload_workflow_deps(self, tool)
+        merged_map = upload_workflow_deps(self, tool, runtimeContext)
 
         # Recreate process object (ArvadosWorkflow or
         # ArvadosCommandTool) because tool document may have been
 
         # Recreate process object (ArvadosWorkflow or
         # ArvadosCommandTool) because tool document may have been
@@ -602,12 +604,13 @@ The 'jobs' API is no longer supported.
             # Create a pipeline template or workflow record and exit.
             if self.work_api == "containers":
                 uuid = upload_workflow(self, tool, job_order,
             # Create a pipeline template or workflow record and exit.
             if self.work_api == "containers":
                 uuid = upload_workflow(self, tool, job_order,
-                                        self.project_uuid,
-                                        uuid=runtimeContext.update_workflow,
-                                        submit_runner_ram=runtimeContext.submit_runner_ram,
-                                        name=runtimeContext.name,
-                                        merged_map=merged_map,
-                                        submit_runner_image=runtimeContext.submit_runner_image)
+                                       self.project_uuid,
+                                       runtimeContext,
+                                       uuid=runtimeContext.update_workflow,
+                                       submit_runner_ram=runtimeContext.submit_runner_ram,
+                                       name=runtimeContext.name,
+                                       merged_map=merged_map,
+                                       submit_runner_image=runtimeContext.submit_runner_image)
                 self.stdout.write(uuid + "\n")
                 return (None, "success")
 
                 self.stdout.write(uuid + "\n")
                 return (None, "success")
 
@@ -616,7 +619,6 @@ The 'jobs' API is no longer supported.
         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
         self.eval_timeout = runtimeContext.eval_timeout
 
         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
         self.eval_timeout = runtimeContext.eval_timeout
 
-        runtimeContext = runtimeContext.copy()
         runtimeContext.use_container = True
         runtimeContext.tmpdir_prefix = "tmp"
         runtimeContext.work_api = self.work_api
         runtimeContext.use_container = True
         runtimeContext.tmpdir_prefix = "tmp"
         runtimeContext.work_api = self.work_api
index 10323c2be71fbe8dff186ca0d253ec6a896fccec..f39c98d8829ddad4c5e4002ce9d9e8e9693c1e62 100644 (file)
@@ -240,7 +240,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run,
+                        workflowobj, uri, loadref_run, runtimeContext,
                         include_primary=True, discovered_secondaryfiles=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
                         include_primary=True, discovered_secondaryfiles=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
@@ -449,12 +449,12 @@ def upload_dependencies(arvrunner, name, document_loader,
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
-    if arvrunner.runtimeContext.copy_deps:
+    if runtimeContext.copy_deps:
         # Find referenced collections and copy them into the
         # destination project, for easy sharing.
         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
                                      filters=[["portable_data_hash", "in", list(keeprefs)],
         # Find referenced collections and copy them into the
         # destination project, for easy sharing.
         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
                                      filters=[["portable_data_hash", "in", list(keeprefs)],
-                                              ["owner_uuid", "=", arvrunner.project_uuid]],
+                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
                                      select=["uuid", "portable_data_hash", "created_at"]))
 
         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
                                      select=["uuid", "portable_data_hash", "created_at"]))
 
         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
@@ -469,7 +469,7 @@ def upload_dependencies(arvrunner, name, document_loader,
             col = col["items"][0]
             try:
                 arvrunner.api.collections().create(body={"collection": {
             col = col["items"][0]
             try:
                 arvrunner.api.collections().create(body={"collection": {
-                    "owner_uuid": arvrunner.project_uuid,
+                    "owner_uuid": runtimeContext.project_uuid,
                     "name": col["name"],
                     "description": col["description"],
                     "properties": col["properties"],
                     "name": col["name"],
                     "description": col["description"],
                     "properties": col["properties"],
@@ -491,7 +491,7 @@ def upload_dependencies(arvrunner, name, document_loader,
     return mapper
 
 
     return mapper
 
 
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
@@ -501,24 +501,26 @@ def upload_docker(arvrunner, tool):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
 
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
 
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker,
-                                                       arvrunner.runtimeContext.copy_deps)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker,
-                                                       arvrunner.runtimeContext.copy_deps)
+                                                       True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
+            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
 
 
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -547,11 +549,11 @@ def packed_workflow(arvrunner, tool, merged_map):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             arvrunner.project_uuid,
-                                                                                                             arvrunner.runtimeContext.force_docker_pull,
-                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                                                                             arvrunner.runtimeContext.match_local_docker,
-                                                                                                             arvrunner.runtimeContext.copy_deps)
+                                                                                                             runtimeContext.project_uuid,
+                                                                                                             runtimeContext.force_docker_pull,
+                                                                                                             runtimeContext.tmp_outdir_prefix,
+                                                                                                             runtimeContext.match_local_docker,
+                                                                                                             runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -572,7 +574,7 @@ def tag_git_version(packed):
             packed["http://schema.org/version"] = githash
 
 
             packed["http://schema.org/version"] = githash
 
 
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
@@ -608,7 +610,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False)
+                                    False,
+                                    runtimeContext)
 
     if "id" in job_order:
         del job_order["id"]
 
     if "id" in job_order:
         del job_order["id"]
@@ -622,10 +625,10 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool)
+    upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
 
     document_loader = tool.doc_loader
 
@@ -640,6 +643,7 @@ def upload_workflow_deps(arvrunner, tool):
                                      deptool,
                                      deptool["id"],
                                      False,
                                      deptool,
                                      deptool["id"],
                                      False,
+                                     runtimeContext,
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
@@ -652,15 +656,17 @@ def upload_workflow_deps(arvrunner, tool):
 
     return merged_map
 
 
     return merged_map
 
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
-                                                          arvrunner.runtimeContext.force_docker_pull,
-                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                          arvrunner.runtimeContext.match_local_docker,
-                                                          arvrunner.runtimeContext.copy_deps)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+                                                          True,
+                                                          runtimeContext.project_uuid,
+                                                          runtimeContext.force_docker_pull,
+                                                          runtimeContext.tmp_outdir_prefix,
+                                                          runtimeContext.match_local_docker,
+                                                          runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
 
index ba0557a9c0457443cf009a8b8362958d66169b45..305d51e144e639004e986c59ef12028fcc45b7f0 100644 (file)
@@ -1117,7 +1117,7 @@ class TestSubmit(unittest.TestCase):
                                                                   "portable_data_hash": "9999999999999999999999999999999b+99"}
 
         self.assertEqual("9999999999999999999999999999999b+99",
                                                                   "portable_data_hash": "9999999999999999999999999999999b+99"}
 
         self.assertEqual("9999999999999999999999999999999b+99",
-                         arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+                         arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext))
 
 
     @stubs
 
 
     @stubs