9701: Merge branch 'master' into 9701-collection-pack-small-files-alt
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 20bbc41bd32627d020f156c4b4e86ac31663971b..c90f8902684304b400cc7ece97068a8e6b094000 100644 (file)
@@ -9,6 +9,8 @@ import os
 import sys
 import threading
 import hashlib
+import copy
+import json
 from functools import partial
 import pkg_resources  # part of setuptools
 
@@ -22,28 +24,31 @@ import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from. runner import Runner
 from .arvtool import ArvadosCommandTool
-from .arvworkflow import ArvadosWorkflow
+from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
-from .arvworkflow import upload_workflow
 from .perf import Perf
-from cwltool.pack import pack
+from .pathmapper import FinalOutputPathMapper
 
-from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.pathmapper import adjustFileObjs
+from cwltool.pack import pack
+from cwltool.process import shortname, UnsupportedRequirement, getListing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
-    def __init__(self, api_client, work_api=None):
+    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -56,6 +61,13 @@ class ArvCwlRunner(object):
         self.work_api = work_api
         self.stop_polling = threading.Event()
         self.poll_api = None
+        self.pipeline = None
+        self.final_output_collection = None
+        self.output_name = output_name
+        if keep_client is not None:
+            self.keep_client = keep_client
+        else:
+            self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -103,7 +115,7 @@ class ArvCwlRunner(object):
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(logger, "done %s" % j.name):
+                        with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
@@ -160,6 +172,59 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
+    def make_output_collection(self, name, outputObj):
+        outputObj = copy.deepcopy(outputObj)
+
+        files = []
+        def capture(fileobj):
+            files.append(fileobj)
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
+
+        final = arvados.collection.Collection(api_client=self.api,
+                                              keep_client=self.keep_client,
+                                              num_retries=self.num_retries)
+
+        srccollections = {}
+        for k,v in generatemapper.items():
+            sp = k.split("/")
+            srccollection = sp[0][5:]
+            if srccollection not in srccollections:
+                srccollections[srccollection] = arvados.collection.CollectionReader(
+                    srccollection,
+                    api_client=self.api,
+                    keep_client=self.keep_client,
+                    num_retries=self.num_retries)
+            reader = srccollections[srccollection]
+            try:
+                srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+                final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+            except IOError as e:
+                logger.warn("While preparing output collection: %s", e)
+
+        def rewrite(fileobj):
+            fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+            for k in ("basename", "size", "listing"):
+                if k in fileobj:
+                    del fileobj[k]
+
+        adjustDirObjs(outputObj, rewrite)
+        adjustFileObjs(outputObj, rewrite)
+
+        with final.open("cwl.output.json", "w") as f:
+            json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
+
+        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+
+        logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
+                    final.api_response()["name"],
+                    final.manifest_locator())
+
+        self.final_output_collection = final
+
     def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
@@ -168,7 +233,9 @@ class ArvCwlRunner(object):
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
-        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
+                                                                 api_client=self.api,
+                                                                 keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
         if kwargs.get("create_template"):
@@ -207,9 +274,9 @@ class ArvCwlRunner(object):
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
             else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
@@ -244,9 +311,12 @@ class ArvCwlRunner(object):
             # except when in cond.wait(), at which point on_message can update
             # job state and process output callbacks.
 
+            loopperf = Perf(metrics, "jobiter")
+            loopperf.__enter__()
             for runnable in jobiter:
+                loopperf.__exit__()
                 if runnable:
-                    with Perf(logger, "run"):
+                    with Perf(metrics, "run"):
                         runnable.run(**kwargs)
                 else:
                     if self.processes:
@@ -254,6 +324,8 @@ class ArvCwlRunner(object):
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
+                loopperf.__enter__()
+            loopperf.__exit__()
 
             while self.processes:
                 self.cond.wait(1)
@@ -285,7 +357,15 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
+        if kwargs.get("submit") and isinstance(runnerjob, Runner):
+            logger.info("Final output collection %s", runnerjob.final_output)
+        else:
+            if self.output_name is None:
+                self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
+            self.make_output_collection(self.output_name, self.final_output)
+
         if kwargs.get("compute_checksum"):
+            adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return self.final_output
@@ -322,6 +402,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
 
+    parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
+
     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -333,6 +415,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="")
 
     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -368,14 +451,16 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 def add_arv_hints():
     cache = {}
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
-    cache["https://w3id.org/cwl/arv-cwl-schema.yml"] = res.read()
+    cache["http://arvados.org/cwl"] = res.read()
     res.close()
-    _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
-    _, extnames, _, _ = schema_salad.schema.load_schema("https://w3id.org/cwl/arv-cwl-schema.yml", cache=cache)
+    document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
+    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
     for n in extnames.names:
-        cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
+        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
+            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
+        document_loader.idx["http://arvados.org/cwl#"+n] = {}
 
-def main(args, stdout, stderr, api_client=None):
+def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
 
     job_order_object = None
@@ -388,7 +473,7 @@ def main(args, stdout, stderr, api_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
     except Exception as e:
         logger.error(e)
         return 1
@@ -400,6 +485,10 @@ def main(args, stdout, stderr, api_client=None):
         logger.setLevel(logging.WARN)
         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+    if arvargs.metrics:
+        metrics.setLevel(logging.DEBUG)
+        logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
+
     arvargs.conformance_test = None
     arvargs.use_container = True