10165: Always make an output collection when a workflow completes.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 30 Sep 2016 18:16:32 +0000 (14:16 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 30 Sep 2016 18:44:51 +0000 (14:44 -0400)
crunch_scripts/cwl-runner
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/runner.py

index 89699f5eaf765aa997a223b2704934c4d5b38da7..3f2d53d038576b0694a3eea257c8b396ca9e16e0 100755 (executable)
@@ -72,36 +72,14 @@ try:
     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
     outputObj = runner.arv_executor(t, job_order_object, **vars(args))
 
-    files = {}
-    def capture(fileobj):
-        path = fileobj["location"]
-        sp = path.split("/")
-        col = sp[0][5:]
-        if col not in files:
-            files[col] = set()
-        files[col].add("/".join(sp[1:]))
-        fileobj["location"] = path
-
-    adjustFileObjs(outputObj, capture)
-
-    final = arvados.collection.Collection()
-
-    for k,v in files.iteritems():
-        with arvados.collection.Collection(k) as c:
-            for f in c:
-                final.copy(f, f, c, True)
-
-    def makeRelative(fileobj):
-        fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
-
-    adjustFileObjs(outputObj, makeRelative)
-
-    with final.open("cwl.output.json", "w") as f:
-        json.dump(outputObj, f, indent=4)
+    if runner.final_output_collection:
+        outputCollection = runner.final_output_collection.portable_data_hash()
+    else:
+        outputCollection = None
 
     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
                                          body={
-                                             'output': final.save_new(create_collection_record=False),
+                                             'output': outputCollection,
                                              'success': True,
                                              'progress':1.0
                                          }).execute()
index 5262cb4971aaa09c002c1d52a56cc1899bf9ed4b..0452486b2aa63397ac502e74260cdb9b93192b77 100644 (file)
@@ -9,6 +9,8 @@ import os
 import sys
 import threading
 import hashlib
+import copy
+import json
 from functools import partial
 import pkg_resources  # part of setuptools
 
@@ -26,10 +28,11 @@ from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .perf import Perf
-from cwltool.pack import pack
+from .pathmapper import InitialWorkDirPathMapper
 
+from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.pathmapper import adjustFileObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
@@ -58,6 +61,7 @@ class ArvCwlRunner(object):
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.pipeline = None
+        self.final_output_collection = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -162,6 +166,48 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
+    def make_output_collection(self, name, outputObj):
+        outputObj = copy.deepcopy(outputObj)
+
+        files = []
+        def capture(fileobj):
+            files.append(fileobj)
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        generatemapper = InitialWorkDirPathMapper(files, "", "",
+                                                  separateDirs=False)
+
+        final = arvados.collection.Collection()
+
+        srccollections = {}
+        for k,v in generatemapper.items():
+            sp = k.split("/")
+            srccollection = sp[0][5:]
+            if srccollection not in srccollections:
+                srccollections[srccollection] = arvados.collection.CollectionReader(srccollection)
+            reader = srccollections[srccollection]
+            try:
+                final.copy("/".join(sp[1:]), v.target, source_collection=reader, overwrite=False)
+            except IOError as e:
+                logger.warn("While preparing output collection: %s", e)
+
+        def rewrite(fileobj):
+            fileobj["location"] = generatemapper.mapper(fileobj["location"])
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        with final.open("cwl.output.json", "w") as f:
+            json.dump(outputObj, f, indent=4)
+
+        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+
+        logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
+
+        self.final_output_collection = final
+
     def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
@@ -295,6 +341,12 @@ class ArvCwlRunner(object):
         if kwargs.get("compute_checksum"):
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
+        if kwargs.get("submit"):
+            logger.info("Final output collection %s", runnerjob.final_output)
+        else:
+            self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
+                                        self.final_output)
+
         return self.final_output
 
 
index 07f85bbc479d7104cb9308f3013eea75c829b758..7828bfdd651a53c8f871b9e24be6188a9fd73cda 100644 (file)
@@ -121,6 +121,7 @@ class Runner(object):
         self.running = False
         self.enable_reuse = enable_reuse
         self.uuid = None
+        self.final_output = None
 
     def update_pipeline_component(self, record):
         pass
@@ -169,7 +170,8 @@ class Runner(object):
         outputs = None
         try:
             try:
-                outc = arvados.collection.Collection(record["output"])
+                self.final_output = record["output"]
+                outc = arvados.collection.Collection(self.final_output)
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
                 def keepify(fileobj):