Merge branch '10088-cwl-dedup-deps' refs #10088
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index abe916f17ae518a6b8a4259a814f2ba2cd9b64a2..7bfdba8b80e48d9cb24d7054933b428aa3729764 100644 (file)
@@ -17,13 +17,14 @@ import cwltool.main
 import cwltool.workflow
 
 import arvados
-import arvados.events
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
+from .perf import Perf
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
@@ -50,6 +51,8 @@ class ArvCwlRunner(object):
         self.num_retries = 4
         self.uuid = None
         self.work_api = work_api
+        self.stop_polling = threading.Event()
+        self.poll_api = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -60,7 +63,8 @@ class ArvCwlRunner(object):
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
+            kwargs["work_api"] = self.work_api
+            return ArvadosCommandTool(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
@@ -94,11 +98,47 @@ class ArvCwlRunner(object):
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        with Perf(logger, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        while True:
+            self.stop_polling.wait(15)
+            if self.stop_polling.is_set():
+                break
+            with self.lock:
+                keys = self.processes.keys()
+            if not keys:
+                continue
+
+            if self.work_api == "containers":
+                table = self.poll_api.containers()
+            elif self.work_api == "jobs":
+                table = self.poll_api.jobs()
+
+            try:
+                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.warn("Error checking states on API server: %s", e)
+                continue
+
+            for p in proc_states["items"]:
+                self.on_message({
+                    "object_uuid": p["uuid"],
+                    "event_type": "update",
+                    "properties": {
+                        "new_attributes": p
+                    }
+                })
+
     def get_uploaded(self):
         return self.uploaded.copy()
 
@@ -124,6 +164,9 @@ class ArvCwlRunner(object):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+        if self.debug:
+            logger.setLevel(logging.DEBUG)
+
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
@@ -136,7 +179,9 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
-        self.debug = kwargs.get("debug")
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
@@ -150,6 +195,7 @@ class ArvCwlRunner(object):
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
+            kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
@@ -181,12 +227,9 @@ class ArvCwlRunner(object):
             runnerjob.run()
             return runnerjob.uuid
 
-        arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
-
-        if self.work_api == "containers":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
-        if self.work_api == "jobs":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+        self.poll_api = arvados.api('v1')
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
 
         if runnerjob:
             jobiter = iter((runnerjob,))
@@ -205,7 +248,8 @@ class ArvCwlRunner(object):
 
             for runnable in jobiter:
                 if runnable:
-                    runnable.run(**kwargs)
+                    with Perf(logger, "run"):
+                        runnable.run(**kwargs)
                 else:
                     if self.processes:
                         self.cond.wait(1)
@@ -216,7 +260,6 @@ class ArvCwlRunner(object):
             while self.processes:
                 self.cond.wait(1)
 
-            events.close()
         except UnsupportedRequirement:
             raise
         except:
@@ -232,6 +275,8 @@ class ArvCwlRunner(object):
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
+            self.stop_polling.set()
+            self.polling_thread.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -289,7 +334,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="enable_reuse",
                         help="")
 
-    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -300,6 +345,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -326,7 +373,7 @@ def main(args, stdout, stderr, api_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if arvargs.create_template and not arvargs.job_order:
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     try: