X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc095ef1f8c86b05a8e58f15c4086a2caa861ec5..7ebb474e7b2ec5597a37253c71733ed361ec0872:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index aa8dac0536..54af2be517 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -655,6 +655,33 @@ def update_from_mapper(workflowobj, mapper): with Perf(metrics, "setloc"): visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper)) +def apply_merged_map(merged_map, workflowobj): + def visit(v, cur_id): + if isinstance(v, dict): + if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"): + if "id" in v: + cur_id = v["id"] + if "path" in v and "location" not in v: + v["location"] = v["path"] + del v["path"] + if "location" in v and cur_id in merged_map: + if v["location"] in merged_map[cur_id].resolved: + v["location"] = merged_map[cur_id].resolved[v["location"]] + if v["location"] in merged_map[cur_id].secondaryFiles: + v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] + #if v.get("class") == "DockerRequirement": + # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, + # runtimeContext) + for l in v: + visit(v[l], cur_id) + if isinstance(v, list): + for l in v: + visit(l, cur_id) + visit(workflowobj, None) + +def update_from_merged_map(tool, merged_map): + tool.visit(partial(apply_merged_map, merged_map)) + def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): """Upload local files referenced in the input object and return updated input object with 'location' updated to the proper keep references. @@ -706,9 +733,7 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): update_from_mapper(job_order, jobmapper) - #print(json.dumps(job_order, indent=2)) - - return job_order + return job_order, jobmapper FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) @@ -751,6 +776,7 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): toolmap = {} for k,v in pm.items(): toolmap[k] = v.resolved + merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) return merged_map