from ruamel.yaml.comments import CommentedMap, CommentedSeq
import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
single_collection=True,
optional_deps=optional_deps)
- print("MMM", mapper._pathmap)
+ for k, v in uuid_map.items():
+ mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
keeprefs = set()
def addkeepref(k):
if k.startswith("keep:"):
keeprefs.add(collection_pdh_pattern.match(k).group(1))
- def setloc(p):
+
+ def collectloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
- p["location"] = mapper.mapper(p["location"]).resolved
addkeepref(p["location"])
return
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
"Collection uuid %s not found" % uuid)
- p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
- p[collectionUUID] = uuid
- #with Perf(metrics, "setloc"):
- # visit_class(workflowobj, ("File", "Directory"), setloc)
- # visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "collectloc"):
+ visit_class(workflowobj, ("File", "Directory"), collectloc)
+ visit_class(discovered, ("File", "Directory"), collectloc)
if discovered_secondaryfiles is not None:
for d in discovered:
continue
col = col["items"][0]
col["name"] = arvados.util.trim_name(col["name"])
- print("CCC name", col["name"])
try:
arvrunner.api.collections().create(body={"collection": {
"owner_uuid": runtimeContext.project_uuid,
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ runtimeContext)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
else:
packed["http://schema.org/version"] = githash
+def setloc(mapper, p):
+ loc = p.get("location")
+ if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
+ return
+
+ if not loc:
+ return
+
+ if collectionUUID in p:
+ uuid = p[collectionUUID]
+ keepuuid = "keep:"+uuid
+ if keepuuid not in mapper:
+ raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ gp = collection_pdh_pattern.match(loc)
+ if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
+ # This file entry has both collectionUUID and a PDH
+ # location. If the PDH doesn't match the one returned
+ # the API server, raise an error.
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Expected collection uuid %s to be %s but API server reported %s" % (
+ uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
+
+ gp = collection_uuid_pattern.match(loc)
+ if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ return
+
+ uuid = gp.groups()[0]
+ keepuuid = "keep:"+uuid
+ if keepuuid not in mapper:
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
+ p[collectionUUID] = uuid
+
+def update_from_mapper(workflowobj, mapper):
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
+
+def apply_merged_map(merged_map, workflowobj):
+ def visit(v, cur_id):
+ if isinstance(v, dict):
+ if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+ if "id" in v:
+ cur_id = v["id"]
+ if "path" in v and "location" not in v:
+ v["location"] = v["path"]
+ del v["path"]
+ if "location" in v and cur_id in merged_map:
+ if v["location"] in merged_map[cur_id].resolved:
+ v["location"] = merged_map[cur_id].resolved[v["location"]]
+ if v["location"] in merged_map[cur_id].secondaryFiles:
+ v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ #if v.get("class") == "DockerRequirement":
+ # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+ # runtimeContext)
+ for l in v:
+ visit(v[l], cur_id)
+ if isinstance(v, list):
+ for l in v:
+ visit(l, cur_id)
+ visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+ tool.visit(partial(apply_merged_map, merged_map))
def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
if "job_order" in job_order:
del job_order["job_order"]
- return job_order
+ update_from_mapper(job_order, jobmapper)
+
+ return job_order, jobmapper
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- # commented out for testing only, uncomment me
- #with Perf(metrics, "upload_docker"):
- # upload_docker(arvrunner, tool, runtimeContext)
+ with Perf(metrics, "upload_docker"):
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
discovered_secondaryfiles=discovered_secondaryfiles,
cache=tool_dep_cache)
- print("PM", pm.items())
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
toolmap[k] = v.resolved
+
merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
return merged_map
try:
return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
- def __init__(self, runner, updated_tool,
+ def __init__(self, runner,
tool, loadingContext, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
collection_cache_is_default=True,
git_info=None):
- loadingContext = loadingContext.copy()
- loadingContext.metadata = updated_tool.metadata.copy()
+ self.loadingContext = loadingContext.copy()
- super(Runner, self).__init__(updated_tool.tool, loadingContext)
+ super(Runner, self).__init__(tool.tool, loadingContext)
self.arvrunner = runner
self.embedded_tool = tool
reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
+ reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
+ if reuse_req:
+ enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
self.uuid = None
self.final_output = None
self.intermediate_output_ttl = intermediate_output_ttl
self.priority = priority
self.secret_store = secret_store
- self.enable_dev = loadingContext.enable_dev
+ self.enable_dev = self.loadingContext.enable_dev
self.git_info = git_info
- self.fast_parser = loadingContext.fast_parser
+ self.fast_parser = self.loadingContext.fast_parser
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB