From 022095999a584b12789259b577965122bb676194 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 12 Oct 2016 10:59:05 -0400 Subject: [PATCH] 10221: Always upload all dependencies up front. Ensures consistent reuse behavior between --local and --submit. Fix pathmapping bugs where previously-uploaded files were added to path map but not actually mapped to target paths. --- sdk/cwl/arvados_cwl/__init__.py | 6 +++- sdk/cwl/arvados_cwl/pathmapper.py | 8 +++-- sdk/cwl/arvados_cwl/runner.py | 49 ++++++++++++++++--------------- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index c90f890268..c8ba5a4f50 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -24,7 +24,7 @@ import arvados.config from .arvcontainer import ArvadosContainer, RunnerContainer from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate -from. runner import Runner +from. runner import Runner, upload_instance from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess @@ -64,6 +64,8 @@ class ArvCwlRunner(object): self.pipeline = None self.final_output_collection = None self.output_name = output_name + self.project_uuid = None + if keep_client is not None: self.keep_client = keep_client else: @@ -266,6 +268,8 @@ class ArvCwlRunner(object): kwargs["docker_outdir"] = "$(task.outdir)" kwargs["tmpdir"] = "$(task.tmpdir)" + upload_instance(self, shortname(tool.tool["id"]), tool, job_order) + runnerjob = None if kwargs.get("submit"): if self.work_api == "containers": diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 228d43304a..5051d977b4 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -41,7 +41,7 @@ class ArvPathMapper(PathMapper): if isinstance(st, arvados.commands.run.UploadFile): uploadfiles.add((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): - self._pathmap[src] = MapperEnt(ab, st.fn, "File") + self._pathmap[src] = MapperEnt("keep:"+ab, self.collection_pattern % ab, "File") elif src.startswith("_:"): if "contents" in srcobj: pass @@ -78,9 +78,11 @@ class ArvPathMapper(PathMapper): def setup(self, referenced_files, basedir): # type: (List[Any], unicode) -> None - self._pathmap = self.arvrunner.get_uploaded() uploadfiles = set() + for k,v in self.arvrunner.get_uploaded().iteritems(): + self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File") + for srcobj in referenced_files: self.visit(srcobj, uploadfiles) @@ -94,7 +96,7 @@ class ArvPathMapper(PathMapper): project=self.arvrunner.project_uuid) for src, ab, st in uploadfiles: - self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File") + self._pathmap[src] = MapperEnt("keep:" + st.keepref, self.collection_pattern % st.keepref, "File") self.arvrunner.add_uploaded(src, self._pathmap[src]) for srcobj in referenced_files: diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index e5b4e006e8..054d3530cf 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -112,6 +112,30 @@ def upload_docker(arvrunner, tool): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) +def upload_instance(arvrunner, name, tool, job_order): + upload_docker(arvrunner, tool) + + workflowmapper = upload_dependencies(arvrunner, + name, + tool.doc_loader, + tool.tool, + tool.tool["id"], + True) + + jobmapper = upload_dependencies(arvrunner, + os.path.basename(job_order.get("id", "#")), + tool.doc_loader, + job_order, + job_order.get("id", "#"), + False) + + adjustDirObjs(job_order, trim_listing) + + if "id" in job_order: + del job_order["id"] + + return workflowmapper + class Runner(object): def __init__(self, runner, tool, job_order, enable_reuse, output_name): @@ -128,31 +152,8 @@ class Runner(object): pass def arvados_job_spec(self, *args, **kwargs): - upload_docker(self.arvrunner, self.tool) - self.name = os.path.basename(self.tool.tool["id"]) - - workflowmapper = upload_dependencies(self.arvrunner, - self.name, - self.tool.doc_loader, - self.tool.tool, - self.tool.tool["id"], - True) - - jobmapper = upload_dependencies(self.arvrunner, - os.path.basename(self.job_order.get("id", "#")), - self.tool.doc_loader, - self.job_order, - self.job_order.get("id", "#"), - False) - - adjustDirObjs(self.job_order, trim_listing) - - if "id" in self.job_order: - del self.job_order["id"] - - return workflowmapper - + return upload_instance(self.arvrunner, self.name, self.tool, self.job_order) def done(self, record): if record["state"] == "Complete": -- 2.30.2