19070: Prefer to use the passed-in runtimeContext
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 10323c2be71fbe8dff186ca0d253ec6a896fccec..f39c98d8829ddad4c5e4002ce9d9e8e9693c1e62 100644 (file)
@@ -240,7 +240,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 def upload_dependencies(arvrunner, name, document_loader,
-                        workflowobj, uri, loadref_run,
+                        workflowobj, uri, loadref_run, runtimeContext,
                         include_primary=True, discovered_secondaryfiles=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
@@ -449,12 +449,12 @@ def upload_dependencies(arvrunner, name, document_loader,
         for d in discovered:
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
-    if arvrunner.runtimeContext.copy_deps:
+    if runtimeContext.copy_deps:
         # Find referenced collections and copy them into the
         # destination project, for easy sharing.
         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
                                      filters=[["portable_data_hash", "in", list(keeprefs)],
-                                              ["owner_uuid", "=", arvrunner.project_uuid]],
+                                              ["owner_uuid", "=", runtimeContext.project_uuid]],
                                      select=["uuid", "portable_data_hash", "created_at"]))
 
         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
@@ -469,7 +469,7 @@ def upload_dependencies(arvrunner, name, document_loader,
             col = col["items"][0]
             try:
                 arvrunner.api.collections().create(body={"collection": {
-                    "owner_uuid": arvrunner.project_uuid,
+                    "owner_uuid": runtimeContext.project_uuid,
                     "name": col["name"],
                     "description": col["description"],
                     "properties": col["properties"],
@@ -491,7 +491,7 @@ def upload_dependencies(arvrunner, name, document_loader,
     return mapper
 
 
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
@@ -501,24 +501,26 @@ def upload_docker(arvrunner, tool):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
 
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker,
-                                                       arvrunner.runtimeContext.copy_deps)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
         else:
             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
-                                                       True, arvrunner.project_uuid,
-                                                       arvrunner.runtimeContext.force_docker_pull,
-                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                       arvrunner.runtimeContext.match_local_docker,
-                                                       arvrunner.runtimeContext.copy_deps)
+                                                       True,
+                                                       runtimeContext.project_uuid,
+                                                       runtimeContext.force_docker_pull,
+                                                       runtimeContext.tmp_outdir_prefix,
+                                                       runtimeContext.match_local_docker,
+                                                       runtimeContext.copy_deps)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
+            upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -547,11 +549,11 @@ def packed_workflow(arvrunner, tool, merged_map):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             arvrunner.project_uuid,
-                                                                                                             arvrunner.runtimeContext.force_docker_pull,
-                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                                                                             arvrunner.runtimeContext.match_local_docker,
-                                                                                                             arvrunner.runtimeContext.copy_deps)
+                                                                                                             runtimeContext.project_uuid,
+                                                                                                             runtimeContext.force_docker_pull,
+                                                                                                             runtimeContext.tmp_outdir_prefix,
+                                                                                                             runtimeContext.match_local_docker,
+                                                                                                             runtimeContext.copy_deps)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -572,7 +574,7 @@ def tag_git_version(packed):
             packed["http://schema.org/version"] = githash
 
 
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
     """
@@ -608,7 +610,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
                                     tool.doc_loader,
                                     job_order,
                                     job_order.get("id", "#"),
-                                    False)
+                                    False,
+                                    runtimeContext)
 
     if "id" in job_order:
         del job_order["id"]
@@ -622,10 +625,10 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool)
+    upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
@@ -640,6 +643,7 @@ def upload_workflow_deps(arvrunner, tool):
                                      deptool,
                                      deptool["id"],
                                      False,
+                                     runtimeContext,
                                      include_primary=False,
                                      discovered_secondaryfiles=discovered_secondaryfiles)
             document_loader.idx[deptool["id"]] = deptool
@@ -652,15 +656,17 @@ def upload_workflow_deps(arvrunner, tool):
 
     return merged_map
 
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
-                                                          arvrunner.runtimeContext.force_docker_pull,
-                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
-                                                          arvrunner.runtimeContext.match_local_docker,
-                                                          arvrunner.runtimeContext.copy_deps)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+                                                          True,
+                                                          runtimeContext.project_uuid,
+                                                          runtimeContext.force_docker_pull,
+                                                          runtimeContext.tmp_outdir_prefix,
+                                                          runtimeContext.match_local_docker,
+                                                          runtimeContext.copy_deps)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )