10165: Add FinalOutputPathMapper. Delete basename/size/listing from output object...
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 30 Sep 2016 19:59:51 +0000 (15:59 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 30 Sep 2016 20:49:52 +0000 (16:49 -0400)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/pathmapper.py

index 0452486b2aa63397ac502e74260cdb9b93192b77..21d30191692814b62005fe9804ff2de92d389541 100644 (file)
@@ -28,7 +28,7 @@ from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .perf import Perf
-from .pathmapper import InitialWorkDirPathMapper
+from .pathmapper import FinalOutputPathMapper
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement
@@ -176,8 +176,7 @@ class ArvCwlRunner(object):
         adjustDirObjs(outputObj, capture)
         adjustFileObjs(outputObj, capture)
 
-        generatemapper = InitialWorkDirPathMapper(files, "", "",
-                                                  separateDirs=False)
+        generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
 
         final = arvados.collection.Collection()
 
@@ -194,13 +193,19 @@ class ArvCwlRunner(object):
                 logger.warn("While preparing output collection: %s", e)
 
         def rewrite(fileobj):
-            fileobj["location"] = generatemapper.mapper(fileobj["location"])
+            fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+            if "basename" in fileobj:
+                del fileobj["basename"]
+            if "size" in fileobj:
+                del fileobj["size"]
+            if "listing" in fileobj:
+                del fileobj["listing"]
 
-        adjustDirObjs(outputObj, capture)
-        adjustFileObjs(outputObj, capture)
+        adjustDirObjs(outputObj, rewrite)
+        adjustFileObjs(outputObj, rewrite)
 
         with final.open("cwl.output.json", "w") as f:
-            json.dump(outputObj, f, indent=4)
+            json.dump(outputObj, f, sort_keys=True, indent=4)
 
         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
 
@@ -338,15 +343,15 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-        if kwargs.get("compute_checksum"):
-            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
-
         if kwargs.get("submit"):
             logger.info("Final output collection %s", runnerjob.final_output)
         else:
             self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
                                         self.final_output)
 
+        if kwargs.get("compute_checksum"):
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
         return self.final_output
 
 
index 0fd9a0ed332ececaf4474ca3939fe9b4785db82d..bd71a133550005c922ea1338012df5a9013a8b2c 100644 (file)
@@ -150,11 +150,11 @@ class InitialWorkDirPathMapper(PathMapper):
 
     def visit(self, obj, stagedir, basedir, copy=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
+        loc = obj["location"]
         if obj["class"] == "Directory":
-            self._pathmap[obj["location"]] = MapperEnt(obj["location"], stagedir, "Directory")
+            self._pathmap[loc] = MapperEnt(obj["location"], stagedir, "Directory")
             self.visitlisting(obj.get("listing", []), stagedir, basedir)
         elif obj["class"] == "File":
-            loc = obj["location"]
             if loc in self._pathmap:
                 return
             tgt = os.path.join(stagedir, obj["basename"])
@@ -172,10 +172,26 @@ class InitialWorkDirPathMapper(PathMapper):
 
         # Go through each file and set the target to its own directory along
         # with any secondary files.
-        stagedir = self.stagedir
-        for fob in referenced_files:
-            self.visit(fob, stagedir, basedir)
+        self.visitlisting(referenced_files, self.stagedir, basedir)
 
         for path, (ab, tgt, type) in self._pathmap.items():
             if type in ("File", "Directory") and ab.startswith("keep:"):
                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
+
+
+class FinalOutputPathMapper(PathMapper):
+    def visit(self, obj, stagedir, basedir, copy=False):
+        # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
+        loc = obj["location"]
+        if obj["class"] == "Directory":
+            self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
+        elif obj["class"] == "File":
+            if loc in self._pathmap:
+                return
+            tgt = os.path.join(stagedir, obj["basename"])
+            self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+            self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+
+    def setup(self, referenced_files, basedir):
+        # type: (List[Any], unicode) -> None
+        self.visitlisting(referenced_files, self.stagedir, basedir)