from ruamel.yaml.comments import CommentedMap, CommentedSeq
import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run, runtimeContext,
+ workflowobj, uri, runtimeContext,
include_primary=True, discovered_secondaryfiles=None,
cache=None):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
does an in-place update of references in "workflowobj".
- Use scandeps to find $import, $include, $schemas, run, File and Directory
+ Use scandeps to find $schemas, File and Directory
fields that represent external references.
If workflowobj has an "id" field, this will reload the document to ensure
it is scanning the raw document prior to preprocessing.
"""
- loaded = set()
- def loadref(b, u):
- joined = document_loader.fetcher.urljoin(b, u)
- defrg, _ = urllib.parse.urldefrag(joined)
- if defrg not in loaded:
- loaded.add(defrg)
- if cache is not None and defrg in cache:
- return cache[defrg]
- # Use fetch_text to get raw file (before preprocessing).
- text = document_loader.fetch_text(defrg)
- if isinstance(text, bytes):
- textIO = StringIO(text.decode('utf-8'))
- else:
- textIO = StringIO(text)
- yamlloader = YAML(typ='safe', pure=True)
- result = yamlloader.load(textIO)
- if cache is not None:
- cache[defrg] = result
- return result
- else:
- return {}
-
- if loadref_run:
- loadref_fields = set(("$import", "run"))
- else:
- loadref_fields = set(("$import",))
-
scanobj = workflowobj
- if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
- defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
- if cache is not None and defrg not in cache:
- # if we haven't seen this file before, want raw file
- # content (before preprocessing) to ensure that external
- # references like $include haven't already been inlined.
- scanobj = loadref("", workflowobj["id"])
-
metadata = scanobj
- with Perf(metrics, "scandeps include, location"):
+ with Perf(metrics, "scandeps"):
sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
+ set(),
+ set(("location",)),
+ None, urljoin=document_loader.fetcher.urljoin,
nestdirs=False)
-
- with Perf(metrics, "scandeps $schemas"):
optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ set(),
+ set(("$schemas",)),
+ None, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
if sc_result is None:
sc_result = []
single_collection=True,
optional_deps=optional_deps)
+ for k, v in uuid_map.items():
+ mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
+
keeprefs = set()
def addkeepref(k):
if k.startswith("keep:"):
keeprefs.add(collection_pdh_pattern.match(k).group(1))
- def setloc(p):
+
+ def collectloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
- p["location"] = mapper.mapper(p["location"]).resolved
addkeepref(p["location"])
return
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
"Collection uuid %s not found" % uuid)
- p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
- p[collectionUUID] = uuid
- with Perf(metrics, "setloc"):
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "collectloc"):
+ visit_class(workflowobj, ("File", "Directory"), collectloc)
+ visit_class(discovered, ("File", "Directory"), collectloc)
if discovered_secondaryfiles is not None:
for d in discovered:
logger.warning("Cannot find collection with portable data hash %s", kr)
continue
col = col["items"][0]
+ col["name"] = arvados.util.trim_name(col["name"])
try:
arvrunner.api.collections().create(body={"collection": {
"owner_uuid": runtimeContext.project_uuid,
"trash_at": col["trash_at"]
}}, ensure_unique_name=True).execute()
except Exception as e:
- logger.warning("Unable copy collection to destination: %s", e)
+ logger.warning("Unable to copy collection to destination: %s", e)
if "$schemas" in workflowobj:
sch = CommentedSeq()
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ runtimeContext)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
else:
packed["http://schema.org/version"] = githash
+def setloc(mapper, p):
+ loc = p.get("location")
+ if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
+ return
+
+ if not loc:
+ return
+
+ if collectionUUID in p:
+ uuid = p[collectionUUID]
+ keepuuid = "keep:"+uuid
+ if keepuuid not in mapper:
+ raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ gp = collection_pdh_pattern.match(loc)
+ if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
+ # This file entry has both collectionUUID and a PDH
+ # location. If the PDH doesn't match the one returned
+ # the API server, raise an error.
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Expected collection uuid %s to be %s but API server reported %s" % (
+ uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
+
+ gp = collection_uuid_pattern.match(loc)
+ if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ return
+
+ uuid = gp.groups()[0]
+ keepuuid = "keep:"+uuid
+ if keepuuid not in mapper:
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
+ p[collectionUUID] = uuid
+
+def update_from_mapper(workflowobj, mapper):
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
+
+def apply_merged_map(merged_map, workflowobj):
+ def visit(v, cur_id):
+ if isinstance(v, dict):
+ if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+ if "id" in v:
+ cur_id = v["id"]
+ if "path" in v and "location" not in v:
+ v["location"] = v["path"]
+ del v["path"]
+ if "location" in v and cur_id in merged_map:
+ if v["location"] in merged_map[cur_id].resolved:
+ v["location"] = merged_map[cur_id].resolved[v["location"]]
+ if v["location"] in merged_map[cur_id].secondaryFiles:
+ v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ #if v.get("class") == "DockerRequirement":
+ # v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+ # runtimeContext)
+ for l in v:
+ visit(v[l], cur_id)
+ if isinstance(v, list):
+ for l in v:
+ visit(l, cur_id)
+ visit(workflowobj, None)
+
+def update_from_merged_map(tool, merged_map):
+ tool.visit(partial(apply_merged_map, merged_map))
def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
jobloader,
job_order,
job_order.get("id", "#"),
- False,
runtimeContext)
if "id" in job_order:
if "job_order" in job_order:
del job_order["job_order"]
- return job_order
+ update_from_mapper(job_order, jobmapper)
+
+ return job_order, jobmapper
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
document_loader,
deptool,
deptool["id"],
- False,
runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles,
cache=tool_dep_cache)
+
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
toolmap[k] = v.resolved
+
merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
return merged_map
try:
return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
- def __init__(self, runner, updated_tool,
+ def __init__(self, runner,
tool, loadingContext, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
collection_cache_is_default=True,
git_info=None):
- loadingContext = loadingContext.copy()
- loadingContext.metadata = updated_tool.metadata.copy()
+ self.loadingContext = loadingContext.copy()
- super(Runner, self).__init__(updated_tool.tool, loadingContext)
+ super(Runner, self).__init__(tool.tool, loadingContext)
self.arvrunner = runner
self.embedded_tool = tool
reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
+ reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
+ if reuse_req:
+ enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
self.uuid = None
self.final_output = None
self.intermediate_output_ttl = intermediate_output_ttl
self.priority = priority
self.secret_store = secret_store
- self.enable_dev = loadingContext.enable_dev
+ self.enable_dev = self.loadingContext.enable_dev
self.git_info = git_info
+ self.fast_parser = self.loadingContext.fast_parser
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB