From f72b0e8bcc350966ce54954711bed538c527eb00 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 30 Sep 2016 15:59:51 -0400 Subject: [PATCH] 10165: Add FinalOutputPathMapper. Delete basename/size/listing from output object because that information is captured by the enclosing Collection. Sort keys. --- sdk/cwl/arvados_cwl/__init__.py | 25 +++++++++++++++---------- sdk/cwl/arvados_cwl/pathmapper.py | 26 +++++++++++++++++++++----- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 0452486b2a..21d3019169 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -28,7 +28,7 @@ from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess from .perf import Perf -from .pathmapper import InitialWorkDirPathMapper +from .pathmapper import FinalOutputPathMapper from cwltool.pack import pack from cwltool.process import shortname, UnsupportedRequirement @@ -176,8 +176,7 @@ class ArvCwlRunner(object): adjustDirObjs(outputObj, capture) adjustFileObjs(outputObj, capture) - generatemapper = InitialWorkDirPathMapper(files, "", "", - separateDirs=False) + generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False) final = arvados.collection.Collection() @@ -194,13 +193,19 @@ class ArvCwlRunner(object): logger.warn("While preparing output collection: %s", e) def rewrite(fileobj): - fileobj["location"] = generatemapper.mapper(fileobj["location"]) + fileobj["location"] = generatemapper.mapper(fileobj["location"]).target + if "basename" in fileobj: + del fileobj["basename"] + if "size" in fileobj: + del fileobj["size"] + if "listing" in fileobj: + del fileobj["listing"] - adjustDirObjs(outputObj, capture) - adjustFileObjs(outputObj, capture) + adjustDirObjs(outputObj, rewrite) + adjustFileObjs(outputObj, rewrite) with final.open("cwl.output.json", "w") as f: - json.dump(outputObj, f, indent=4) + json.dump(outputObj, f, sort_keys=True, indent=4) final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True) @@ -338,15 +343,15 @@ class ArvCwlRunner(object): if self.final_output is None: raise WorkflowException("Workflow did not return a result.") - if kwargs.get("compute_checksum"): - adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) - if kwargs.get("submit"): logger.info("Final output collection %s", runnerjob.final_output) else: self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])), self.final_output) + if kwargs.get("compute_checksum"): + adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) + return self.final_output diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 0fd9a0ed33..bd71a13355 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -150,11 +150,11 @@ class InitialWorkDirPathMapper(PathMapper): def visit(self, obj, stagedir, basedir, copy=False): # type: (Dict[unicode, Any], unicode, unicode, bool) -> None + loc = obj["location"] if obj["class"] == "Directory": - self._pathmap[obj["location"]] = MapperEnt(obj["location"], stagedir, "Directory") + self._pathmap[loc] = MapperEnt(obj["location"], stagedir, "Directory") self.visitlisting(obj.get("listing", []), stagedir, basedir) elif obj["class"] == "File": - loc = obj["location"] if loc in self._pathmap: return tgt = os.path.join(stagedir, obj["basename"]) @@ -172,10 +172,26 @@ class InitialWorkDirPathMapper(PathMapper): # Go through each file and set the target to its own directory along # with any secondary files. - stagedir = self.stagedir - for fob in referenced_files: - self.visit(fob, stagedir, basedir) + self.visitlisting(referenced_files, self.stagedir, basedir) for path, (ab, tgt, type) in self._pathmap.items(): if type in ("File", "Directory") and ab.startswith("keep:"): self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type) + + +class FinalOutputPathMapper(PathMapper): + def visit(self, obj, stagedir, basedir, copy=False): + # type: (Dict[unicode, Any], unicode, unicode, bool) -> None + loc = obj["location"] + if obj["class"] == "Directory": + self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory") + elif obj["class"] == "File": + if loc in self._pathmap: + return + tgt = os.path.join(stagedir, obj["basename"]) + self._pathmap[loc] = MapperEnt(loc, tgt, "File") + self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir) + + def setup(self, referenced_files, basedir): + # type: (List[Any], unicode) -> None + self.visitlisting(referenced_files, self.stagedir, basedir) -- 2.30.2