10165: Handle copying complete collection contents into subdirectory.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 27af075f3652b4180e23906ed2fc914812036534..5bdffd5048a4625a8ae240d2e16684f0c6de3384 100644 (file)
@@ -9,30 +9,38 @@ import os
 import sys
 import threading
 import hashlib
 import sys
 import threading
 import hashlib
+import copy
+import json
 from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
+import schema_salad
 
 import arvados
 
 import arvados
-import arvados.events
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
+from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .fsaccess import CollectionFsAccess
+from .perf import Perf
+from .pathmapper import FinalOutputPathMapper
 
 
+from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.pathmapper import adjustFileObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
 logger.setLevel(logging.INFO)
 
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
@@ -50,6 +58,10 @@ class ArvCwlRunner(object):
         self.num_retries = 4
         self.uuid = None
         self.work_api = work_api
         self.num_retries = 4
         self.uuid = None
         self.work_api = work_api
+        self.stop_polling = threading.Event()
+        self.poll_api = None
+        self.pipeline = None
+        self.final_output_collection = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -58,9 +70,12 @@ class ArvCwlRunner(object):
         if self.work_api not in ("containers", "jobs"):
             raise Exception("Unsupported API '%s'" % self.work_api)
 
         if self.work_api not in ("containers", "jobs"):
             raise Exception("Unsupported API '%s'" % self.work_api)
 
-    def arvMakeTool(self, toolpath_object, **kwargs):
+    def arv_make_tool(self, toolpath_object, **kwargs):
+        kwargs["work_api"] = self.work_api
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
+            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+        elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
+            return ArvadosWorkflow(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
@@ -94,11 +109,47 @@ class ArvCwlRunner(object):
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        with Perf(metrics, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        while True:
+            self.stop_polling.wait(15)
+            if self.stop_polling.is_set():
+                break
+            with self.lock:
+                keys = self.processes.keys()
+            if not keys:
+                continue
+
+            if self.work_api == "containers":
+                table = self.poll_api.containers()
+            elif self.work_api == "jobs":
+                table = self.poll_api.jobs()
+
+            try:
+                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.warn("Error checking states on API server: %s", e)
+                continue
+
+            for p in proc_states["items"]:
+                self.on_message({
+                    "object_uuid": p["uuid"],
+                    "event_type": "update",
+                    "properties": {
+                        "new_attributes": p
+                    }
+                })
+
     def get_uploaded(self):
         return self.uploaded.copy()
 
     def get_uploaded(self):
         return self.uploaded.copy()
 
@@ -115,15 +166,56 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
             for v in obj:
                 self.check_writable(v)
 
-    def arvExecutor(self, tool, job_order, **kwargs):
+    def make_output_collection(self, name, outputObj):
+        outputObj = copy.deepcopy(outputObj)
+
+        files = []
+        def capture(fileobj):
+            files.append(fileobj)
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
+
+        final = arvados.collection.Collection()
+
+        srccollections = {}
+        for k,v in generatemapper.items():
+            sp = k.split("/")
+            srccollection = sp[0][5:]
+            if srccollection not in srccollections:
+                srccollections[srccollection] = arvados.collection.CollectionReader(srccollection)
+            reader = srccollections[srccollection]
+            try:
+                srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+                final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+            except IOError as e:
+                logger.warn("While preparing output collection: %s", e)
+
+        def rewrite(fileobj):
+            fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+            for k in ("basename", "size", "listing"):
+                if k in fileobj:
+                    del fileobj[k]
+
+        adjustDirObjs(outputObj, rewrite)
+        adjustFileObjs(outputObj, rewrite)
+
+        with final.open("cwl.output.json", "w") as f:
+            json.dump(outputObj, f, sort_keys=True, indent=4)
+
+        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+
+        logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
+
+        self.final_output_collection = final
+
+    def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
         tool.visit(self.check_writable)
 
         self.debug = kwargs.get("debug")
 
         tool.visit(self.check_writable)
 
-        if kwargs.get("quiet"):
-            logger.setLevel(logging.WARN)
-            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
@@ -136,7 +228,9 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
-        self.debug = kwargs.get("debug")
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
@@ -182,12 +276,9 @@ class ArvCwlRunner(object):
             runnerjob.run()
             return runnerjob.uuid
 
             runnerjob.run()
             return runnerjob.uuid
 
-        arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
-
-        if self.work_api == "containers":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
-        if self.work_api == "jobs":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+        self.poll_api = arvados.api('v1')
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
 
         if runnerjob:
             jobiter = iter((runnerjob,))
 
         if runnerjob:
             jobiter = iter((runnerjob,))
@@ -204,20 +295,25 @@ class ArvCwlRunner(object):
             # except when in cond.wait(), at which point on_message can update
             # job state and process output callbacks.
 
             # except when in cond.wait(), at which point on_message can update
             # job state and process output callbacks.
 
+            loopperf = Perf(metrics, "jobiter")
+            loopperf.__enter__()
             for runnable in jobiter:
             for runnable in jobiter:
+                loopperf.__exit__()
                 if runnable:
                 if runnable:
-                    runnable.run(**kwargs)
+                    with Perf(metrics, "run"):
+                        runnable.run(**kwargs)
                 else:
                     if self.processes:
                         self.cond.wait(1)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
                 else:
                     if self.processes:
                         self.cond.wait(1)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
+                loopperf.__enter__()
+            loopperf.__exit__()
 
             while self.processes:
                 self.cond.wait(1)
 
 
             while self.processes:
                 self.cond.wait(1)
 
-            events.close()
         except UnsupportedRequirement:
             raise
         except:
         except UnsupportedRequirement:
             raise
         except:
@@ -233,6 +329,8 @@ class ArvCwlRunner(object):
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
+            self.stop_polling.set()
+            self.polling_thread.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -243,6 +341,12 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
+        if kwargs.get("submit"):
+            logger.info("Final output collection %s", runnerjob.final_output)
+        else:
+            self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
+                                        self.final_output)
+
         if kwargs.get("compute_checksum"):
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         if kwargs.get("compute_checksum"):
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
@@ -280,6 +384,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
 
     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
 
+    parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
+
     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
     exgroup = parser.add_mutually_exclusive_group()
     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -290,7 +396,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="enable_reuse",
                         help="")
 
                         default=True, dest="enable_reuse",
                         help="")
 
-    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -301,6 +407,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -321,15 +429,27 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     return parser
 
 
     return parser
 
+def add_arv_hints():
+    cache = {}
+    res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
+    cache["http://arvados.org/cwl"] = res.read()
+    res.close()
+    _, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
+    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
+    for n in extnames.names:
+        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
+            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
 
 def main(args, stdout, stderr, api_client=None):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
 def main(args, stdout, stderr, api_client=None):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if arvargs.create_template and not arvargs.job_order:
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
         job_order_object = ({}, "")
 
+    add_arv_hints()
+
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
@@ -338,14 +458,25 @@ def main(args, stdout, stderr, api_client=None):
         logger.error(e)
         return 1
 
         logger.error(e)
         return 1
 
+    if arvargs.debug:
+        logger.setLevel(logging.DEBUG)
+
+    if arvargs.quiet:
+        logger.setLevel(logging.WARN)
+        logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
+    if arvargs.metrics:
+        metrics.setLevel(logging.DEBUG)
+        logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
+
     arvargs.conformance_test = None
     arvargs.use_container = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
     arvargs.conformance_test = None
     arvargs.use_container = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
-                             executor=runner.arvExecutor,
-                             makeTool=runner.arvMakeTool,
+                             executor=runner.arv_executor,
+                             makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))