Merge branch '20241-validate-ssh-keys'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 026eb30995260e1ed2daed34414979ec42a78368..54af2be5173eb0b3ea9b4a843d9344f6d129ece0 100644 (file)
@@ -69,7 +69,7 @@ from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
@@ -447,6 +447,9 @@ def upload_dependencies(arvrunner, name, document_loader,
                                single_collection=True,
                                optional_deps=optional_deps)
 
+    for k, v in uuid_map.items():
+        mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
+
     keeprefs = set()
     def addkeepref(k):
         if k.startswith("keep:"):
@@ -586,11 +589,7 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
-                                                                                                             runtimeContext.project_uuid,
-                                                                                                             runtimeContext.force_docker_pull,
-                                                                                                             runtimeContext.tmp_outdir_prefix,
-                                                                                                             runtimeContext.match_local_docker,
-                                                                                                             runtimeContext.copy_deps)
+                                                                                                             runtimeContext)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -626,17 +625,18 @@ def setloc(mapper, p):
 
     if collectionUUID in p:
         uuid = p[collectionUUID]
-        if uuid not in uuid_map:
+        keepuuid = "keep:"+uuid
+        if keepuuid not in mapper:
             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
                 "Collection uuid %s not found" % uuid)
         gp = collection_pdh_pattern.match(loc)
-        if gp and uuid_map[uuid] != gp.groups()[0]:
+        if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
             # This file entry has both collectionUUID and a PDH
             # location. If the PDH doesn't match the one returned
             # the API server, raise an error.
             raise SourceLine(p, "location", validate.ValidationException).makeError(
                 "Expected collection uuid %s to be %s but API server reported %s" % (
-                    uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+                    uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
 
     gp = collection_uuid_pattern.match(loc)
     if not gp:
@@ -644,17 +644,44 @@ def setloc(mapper, p):
         return
 
     uuid = gp.groups()[0]
-    if uuid not in uuid_map:
+    keepuuid = "keep:"+uuid
+    if keepuuid not in mapper:
         raise SourceLine(p, "location", validate.ValidationException).makeError(
             "Collection uuid %s not found" % uuid)
-    p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+    p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
     p[collectionUUID] = uuid
 
-
 def update_from_mapper(workflowobj, mapper):
     with Perf(metrics, "setloc"):
         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
 
+def apply_merged_map(merged_map, workflowobj):
+    def visit(v, cur_id):
+        if isinstance(v, dict):
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if "id" in v:
+                    cur_id = v["id"]
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            #if v.get("class") == "DockerRequirement":
+            #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+            #                                                                                                 runtimeContext)
+            for l in v:
+                visit(v[l], cur_id)
+        if isinstance(v, list):
+            for l in v:
+                visit(l, cur_id)
+    visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+    tool.visit(partial(apply_merged_map, merged_map))
+
 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
     """Upload local files referenced in the input object and return updated input
     object with 'location' updated to the proper keep references.
@@ -706,14 +733,13 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
 
     update_from_mapper(job_order, jobmapper)
 
-    return job_order
+    return job_order, jobmapper
 
 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 
 def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    # commented out for testing only, uncomment me
     with Perf(metrics, "upload_docker"):
         upload_docker(arvrunner, tool, runtimeContext)
 
@@ -750,6 +776,7 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
         toolmap = {}
         for k,v in pm.items():
             toolmap[k] = v.resolved
+
         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
 
     return merged_map
@@ -793,7 +820,7 @@ class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, updated_tool,
+    def __init__(self, runner,
                  tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
@@ -804,9 +831,8 @@ class Runner(Process):
                  git_info=None):
 
         self.loadingContext = loadingContext.copy()
-        self.loadingContext.metadata = updated_tool.metadata.copy()
 
-        super(Runner, self).__init__(updated_tool.tool, loadingContext)
+        super(Runner, self).__init__(tool.tool, loadingContext)
 
         self.arvrunner = runner
         self.embedded_tool = tool