10460: Support for file and directory literals when constructing output
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 4 Nov 2016 20:54:43 +0000 (16:54 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 4 Nov 2016 20:54:43 +0000 (16:54 -0400)
collection.  File locations in final collection reference final output
collection and not intermediate collections.

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/pathmapper.py

index 3144592fc98f47083581e06fabcf900517d7ab01..c221a322e383993821c3805f3e3cf0c025991319 100644 (file)
@@ -201,14 +201,28 @@ class ArvCwlRunner(object):
 
         srccollections = {}
         for k,v in generatemapper.items():
+            if k.startswith("_:"):
+                if v.type == "Directory":
+                    continue
+                if v.type == "CreateFile":
+                    with final.open(v.target, "wb") as f:
+                        f.write(v.resolved.encode("utf-8"))
+                    continue
+
+            if not k.startswith("keep:"):
+                raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
             if srccollection not in srccollections:
-                srccollections[srccollection] = arvados.collection.CollectionReader(
-                    srccollection,
-                    api_client=self.api,
-                    keep_client=self.keep_client,
-                    num_retries=self.num_retries)
+                try:
+                    srccollections[srccollection] = arvados.collection.CollectionReader(
+                        srccollection,
+                        api_client=self.api,
+                        keep_client=self.keep_client,
+                        num_retries=self.num_retries)
+                except arvados.errors.ArgumentError as e:
+                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                    raise
             reader = srccollections[srccollection]
             try:
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
@@ -218,7 +232,7 @@ class ArvCwlRunner(object):
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("basename", "size", "listing"):
+            for k in ("basename", "size", "listing", "contents"):
                 if k in fileobj:
                     del fileobj[k]
 
@@ -234,7 +248,14 @@ class ArvCwlRunner(object):
                     final.api_response()["name"],
                     final.manifest_locator())
 
+        def finalcollection(fileobj):
+            fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+        adjustDirObjs(outputObj, finalcollection)
+        adjustFileObjs(outputObj, finalcollection)
+
         self.final_output_collection = final
+        return outputObj
 
     def set_crunch_output(self):
         if self.work_api == "containers":
@@ -390,7 +411,7 @@ class ArvCwlRunner(object):
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
-            self.make_output_collection(self.output_name, self.final_output)
+            self.final_output = self.make_output_collection(self.output_name, self.final_output)
             self.set_crunch_output()
 
         if self.final_status != "success":
index 73c81ceb0fcdb033203c1b7e5425b3875ea121d6..15685fcbac3a4391858184462fecb266b89b02fa 100644 (file)
@@ -187,14 +187,19 @@ class FinalOutputPathMapper(PathMapper):
     def visit(self, obj, stagedir, basedir, copy=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
+        tgt = os.path.join(stagedir, obj["basename"])
         if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
+            self._pathmap[loc] = MapperEnt(tgt, tgt, "Directory")
+            if loc.startswith("_:"):
+                self.visitlisting(obj.get("listing", []), tgt, basedir)
         elif obj["class"] == "File":
             if loc in self._pathmap:
                 return
-            tgt = os.path.join(stagedir, obj["basename"])
-            self._pathmap[loc] = MapperEnt(loc, tgt, "File")
-            self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+            if "contents" in obj and obj["location"].startswith("_:"):
+                self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
+            else:
+                self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+                self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
 
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None