+def setloc(mapper, p):
+ loc = p.get("location")
+ if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
+ return
+
+ if not loc:
+ return
+
+ if collectionUUID in p:
+ uuid = p[collectionUUID]
+ keepuuid = "keep:"+uuid
+ if keepuuid not in mapper:
+ raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ gp = collection_pdh_pattern.match(loc)
+ if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
+ # This file entry has both collectionUUID and a PDH
+ # location. If the PDH doesn't match the one returned
+ # the API server, raise an error.
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Expected collection uuid %s to be %s but API server reported %s" % (
+ uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
+
+ gp = collection_uuid_pattern.match(loc)
+ if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ return
+
+ uuid = gp.groups()[0]
+ keepuuid = "keep:"+uuid
+ if keepuuid not in mapper:
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
+ p[collectionUUID] = uuid
+
+def update_from_mapper(workflowobj, mapper):
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
+
+def apply_merged_map(merged_map, workflowobj):
+ def visit(v, cur_id):
+ if isinstance(v, dict):
+ if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+ if "id" in v:
+ cur_id = v["id"]
+ if "path" in v and "location" not in v:
+ v["location"] = v["path"]
+ del v["path"]
+ if "location" in v and cur_id in merged_map:
+ if v["location"] in merged_map[cur_id].resolved:
+ v["location"] = merged_map[cur_id].resolved[v["location"]]
+ if v["location"] in merged_map[cur_id].secondaryFiles:
+ v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ #if v.get("class") == "DockerRequirement":
+ # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+ # runtimeContext)
+ for l in v:
+ visit(v[l], cur_id)
+ if isinstance(v, list):
+ for l in v:
+ visit(l, cur_id)
+ visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+ tool.visit(partial(apply_merged_map, merged_map))