X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b420ec626f8cb5cd7a8b4252dfc2be76ba3ba844..9f42cb85807ebad098aaf6e0ab3218f763b712e2:/sdk/cwl/arvados_cwl/runner.py?ds=sidebyside diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index dc6d0df3f1..54af2be517 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -69,7 +69,7 @@ from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq import arvados_cwl.arvdocker -from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern +from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt from ._version import __version__ from . import done from . context import ArvRuntimeContext @@ -447,17 +447,18 @@ def upload_dependencies(arvrunner, name, document_loader, single_collection=True, optional_deps=optional_deps) - print("MMM", mapper._pathmap) + for k, v in uuid_map.items(): + mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False) keeprefs = set() def addkeepref(k): if k.startswith("keep:"): keeprefs.add(collection_pdh_pattern.match(k).group(1)) - def setloc(p): + + def collectloc(p): loc = p.get("location") if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): - p["location"] = mapper.mapper(p["location"]).resolved addkeepref(p["location"]) return @@ -488,12 +489,10 @@ def upload_dependencies(arvrunner, name, document_loader, if uuid not in uuid_map: raise SourceLine(p, "location", validate.ValidationException).makeError( "Collection uuid %s not found" % uuid) - p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "") - p[collectionUUID] = uuid - #with Perf(metrics, "setloc"): - # visit_class(workflowobj, ("File", "Directory"), setloc) - # visit_class(discovered, ("File", "Directory"), setloc) + with Perf(metrics, "collectloc"): + visit_class(workflowobj, ("File", "Directory"), collectloc) + visit_class(discovered, ("File", "Directory"), collectloc) if discovered_secondaryfiles is not None: for d in discovered: @@ -518,7 +517,6 @@ def upload_dependencies(arvrunner, name, document_loader, continue col = col["items"][0] col["name"] = arvados.util.trim_name(col["name"]) - print("CCC name", col["name"]) try: arvrunner.api.collections().create(body={"collection": { "owner_uuid": runtimeContext.project_uuid, @@ -553,20 +551,10 @@ def upload_docker(arvrunner, tool, runtimeContext): raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, - runtimeContext.project_uuid, - runtimeContext.force_docker_pull, - runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker, - runtimeContext.copy_deps) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, - True, - runtimeContext.project_uuid, - runtimeContext.force_docker_pull, - runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker, - runtimeContext.copy_deps) + True, runtimeContext) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool, runtimeContext) @@ -601,11 +589,7 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info): v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, - runtimeContext.project_uuid, - runtimeContext.force_docker_pull, - runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker, - runtimeContext.copy_deps) + runtimeContext) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -630,6 +614,73 @@ def tag_git_version(packed): else: packed["http://schema.org/version"] = githash +def setloc(mapper, p): + loc = p.get("location") + if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): + p["location"] = mapper.mapper(p["location"]).resolved + return + + if not loc: + return + + if collectionUUID in p: + uuid = p[collectionUUID] + keepuuid = "keep:"+uuid + if keepuuid not in mapper: + raise SourceLine(p, collectionUUID, validate.ValidationException).makeError( + "Collection uuid %s not found" % uuid) + gp = collection_pdh_pattern.match(loc) + if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]: + # This file entry has both collectionUUID and a PDH + # location. If the PDH doesn't match the one returned + # the API server, raise an error. + raise SourceLine(p, "location", validate.ValidationException).makeError( + "Expected collection uuid %s to be %s but API server reported %s" % ( + uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved)) + + gp = collection_uuid_pattern.match(loc) + if not gp: + # Not a uuid pattern (must be a pdh pattern) + return + + uuid = gp.groups()[0] + keepuuid = "keep:"+uuid + if keepuuid not in mapper: + raise SourceLine(p, "location", validate.ValidationException).makeError( + "Collection uuid %s not found" % uuid) + p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "") + p[collectionUUID] = uuid + +def update_from_mapper(workflowobj, mapper): + with Perf(metrics, "setloc"): + visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper)) + +def apply_merged_map(merged_map, workflowobj): + def visit(v, cur_id): + if isinstance(v, dict): + if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"): + if "id" in v: + cur_id = v["id"] + if "path" in v and "location" not in v: + v["location"] = v["path"] + del v["path"] + if "location" in v and cur_id in merged_map: + if v["location"] in merged_map[cur_id].resolved: + v["location"] = merged_map[cur_id].resolved[v["location"]] + if v["location"] in merged_map[cur_id].secondaryFiles: + v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] + #if v.get("class") == "DockerRequirement": + # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, + # runtimeContext) + for l in v: + visit(v[l], cur_id) + if isinstance(v, list): + for l in v: + visit(l, cur_id) + visit(workflowobj, None) + +def update_from_merged_map(tool, merged_map): + tool.visit(partial(apply_merged_map, merged_map)) def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): """Upload local files referenced in the input object and return updated input @@ -680,16 +731,17 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): if "job_order" in job_order: del job_order["job_order"] - return job_order + update_from_mapper(job_order, jobmapper) + + return job_order, jobmapper FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - # commented out for testing only, uncomment me - #with Perf(metrics, "upload_docker"): - # upload_docker(arvrunner, tool, runtimeContext) + with Perf(metrics, "upload_docker"): + upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader @@ -720,11 +772,11 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext): discovered_secondaryfiles=discovered_secondaryfiles, cache=tool_dep_cache) - print("PM", pm.items()) document_loader.idx[deptool["id"]] = deptool toolmap = {} for k,v in pm.items(): toolmap[k] = v.resolved + merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles) return merged_map @@ -734,12 +786,7 @@ def arvados_jobs_image(arvrunner, img, runtimeContext): try: return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, - True, - runtimeContext.project_uuid, - runtimeContext.force_docker_pull, - runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker, - runtimeContext.copy_deps) + True, runtimeContext) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) @@ -773,7 +820,7 @@ class Runner(Process): """Base class for runner processes, which submit an instance of arvados-cwl-runner and wait for the final result.""" - def __init__(self, runner, updated_tool, + def __init__(self, runner, tool, loadingContext, enable_reuse, output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, @@ -783,10 +830,9 @@ class Runner(Process): collection_cache_is_default=True, git_info=None): - loadingContext = loadingContext.copy() - loadingContext.metadata = updated_tool.metadata.copy() + self.loadingContext = loadingContext.copy() - super(Runner, self).__init__(updated_tool.tool, loadingContext) + super(Runner, self).__init__(tool.tool, loadingContext) self.arvrunner = runner self.embedded_tool = tool @@ -798,6 +844,9 @@ class Runner(Process): reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] + reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse") + if reuse_req: + enable_reuse = reuse_req["enableReuse"] self.enable_reuse = enable_reuse self.uuid = None self.final_output = None @@ -809,9 +858,9 @@ class Runner(Process): self.intermediate_output_ttl = intermediate_output_ttl self.priority = priority self.secret_store = secret_store - self.enable_dev = loadingContext.enable_dev + self.enable_dev = self.loadingContext.enable_dev self.git_info = git_info - self.fast_parser = loadingContext.fast_parser + self.fast_parser = self.loadingContext.fast_parser self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB