13108: Collect outputs in separate threads as well
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 2792b30fbd4a5501c92f9b2a001bbbebf5b1dc8d..c0e919c6ce9c52fc0f6579f00d211f221e569757 100644 (file)
@@ -17,6 +17,8 @@ import json
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
+import Queue
+import time
 
 from cwltool.errors import WorkflowException
 import cwltool.main
@@ -62,11 +64,13 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+    def __init__(self, api_client, work_api=None, keep_client=None,
+                 output_name=None, output_tags=None, num_retries=4,
+                 parallel_submit_count=4):
         self.api = api_client
         self.processes = {}
-        self.lock = threading.Lock()
-        self.cond = threading.Condition(self.lock)
+        self.in_flight = 0
+        self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
@@ -82,6 +86,10 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
+        self.runnable_queue = Queue.Queue()
+        self.runnable_queue_threads = []
+        self.parallel_submit_count = parallel_submit_count
+        self.poll_interval = 12
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -123,40 +131,70 @@ class ArvCwlRunner(object):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
-        if processStatus == "success":
-            logger.info("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
-        else:
-            logger.warn("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-        self.final_status = processStatus
-        self.final_output = out
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            else:
+                logger.warn("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+
+    def runnable_queue_func(self):
+        while True:
+            task = self.runnable_queue.get()
+            if task is None:
+                return
+            task()
+
+    def start_run(self, runnable, kwargs):
+        with self.workflow_eval_lock:
+            self.in_flight += 1
+        self.runnable_queue.put(partial(runnable.run, **kwargs))
+
+    def process_submitted(self, container):
+        with self.workflow_eval_lock:
+            self.processes[container.uuid] = container
+            self.in_flight -= 1
+
+    def process_done(self, uuid):
+        with self.workflow_eval_lock:
+            if uuid in self.processes:
+                del self.processes[uuid]
+
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
 
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
                     uuid = event["object_uuid"]
-                    with self.lock:
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         logger.info("%s %s is Running", self.label(j), uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
                     uuid = event["object_uuid"]
-                    try:
-                        self.cond.acquire()
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(metrics, "done %s" % j.name):
-                            j.done(event["properties"]["new_attributes"])
-                        self.cond.notify()
-                    finally:
-                        self.cond.release()
+                    def done_cb():
+                        j.done(event["properties"]["new_attributes"])
+                        with self.workflow_eval_lock:
+                            self.workflow_eval_lock.notify()
+                    self.runnable_queue.put(done_cb)
+
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -168,15 +206,19 @@ class ArvCwlRunner(object):
         """
 
         try:
+            remain_wait = self.poll_interval
             while True:
-                self.stop_polling.wait(15)
+                if remain_wait > 0:
+                    self.stop_polling.wait(remain_wait)
                 if self.stop_polling.is_set():
                     break
-                with self.lock:
-                    keys = self.processes.keys()
+                with self.workflow_eval_lock:
+                    keys = list(self.processes.keys())
                 if not keys:
+                    remain_wait = self.poll_interval
                     continue
 
+                begin_poll = time.time()
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
@@ -186,6 +228,7 @@ class ArvCwlRunner(object):
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
+                    remain_wait = self.poll_interval
                     continue
 
                 for p in proc_states["items"]:
@@ -196,12 +239,13 @@ class ArvCwlRunner(object):
                             "new_attributes": p
                         }
                     })
+                finish_poll = time.time()
+                remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
-            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
-            self.cond.acquire()
-            self.processes.clear()
-            self.cond.notify()
-            self.cond.release()
+            logger.exception("Fatal error in state polling thread.")
+            with self.workflow_eval_lock:
+                self.processes.clear()
+                self.workflow_eval_lock.notify()
         finally:
             self.stop_polling.set()
 
@@ -377,11 +421,13 @@ class ArvCwlRunner(object):
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
+        # Don't validate this time because it will just print redundant errors.
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
                                   makeTool=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
-                                  metadata=tool.metadata)
+                                  metadata=tool.metadata,
+                                  do_validate=False)
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % kwargs["name"],
@@ -455,7 +501,7 @@ class ArvCwlRunner(object):
                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
                                                 merged_map=merged_map,
                                                 priority=kwargs.get("priority"),
-                                                secret_store=kwargs.get("secret_store"))
+                                                secret_store=self.secret_store)
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
                                       self.output_name,
@@ -483,6 +529,11 @@ class ArvCwlRunner(object):
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
+        for r in xrange(0, self.parallel_submit_count):
+            t = threading.Thread(target=self.runnable_queue_func)
+            self.runnable_queue_threads.append(t)
+            t.start()
+
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
@@ -493,10 +544,11 @@ class ArvCwlRunner(object):
                                **kwargs)
 
         try:
-            self.cond.acquire()
-            # Will continue to hold the lock for the duration of this code
-            # except when in cond.wait(), at which point on_message can update
-            # job state and process output callbacks.
+            self.workflow_eval_lock.acquire()
+            # Holds the lock while this code runs and releases it when
+            # it is safe to do so in self.workflow_eval_lock.wait(),
+            # at which point on_message can update job state and
+            # process output callbacks.
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
@@ -508,10 +560,10 @@ class ArvCwlRunner(object):
 
                 if runnable:
                     with Perf(metrics, "run"):
-                        runnable.run(**kwargs)
+                        self.start_run(runnable, kwargs)
                 else:
-                    if self.processes:
-                        self.cond.wait(1)
+                    if (self.in_flight + len(self.processes)) > 0:
+                        self.workflow_eval_lock.wait(3)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
@@ -519,7 +571,7 @@ class ArvCwlRunner(object):
             loopperf.__exit__()
 
             while self.processes:
-                self.cond.wait(1)
+                self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
             raise
@@ -535,9 +587,19 @@ class ArvCwlRunner(object):
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
-            self.cond.release()
+            self.workflow_eval_lock.release()
+            try:
+                # Drain queue
+                while not self.runnable_queue.empty():
+                    self.runnable_queue.get()
+            except Queue.Empty:
+                pass
             self.stop_polling.set()
             self.polling_thread.join()
+            for t in self.runnable_queue_threads:
+                self.runnable_queue.put(None)
+            for t in self.runnable_queue_threads:
+                t.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -679,6 +741,17 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
                         default=DEFAULT_PRIORITY)
 
+    parser.add_argument("--disable-validate", dest="do_validate",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--disable-js-validation",
+                        action="store_true", default=False,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--parallel-submit-count", type=int,
+                        default=4, help="Submit requests in parallel (default 4)")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
@@ -735,7 +808,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     try:
         if api_client is None:
-            api_client=arvados.api('v1', model=OrderedJsonModel())
+            api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+            keep_client = api_client.keep
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,