13108: Refactor task queue into its own class.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index f70fa65fb9d8ea6018e726cfe3ee9489f53d83a2..7affade0734536fb3e7ee241bed9357a995eb949 100644 (file)
@@ -40,6 +40,7 @@ from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
 from ._version import __version__
 
 from cwltool.pack import pack
@@ -65,11 +66,9 @@ class ArvCwlRunner(object):
     """
 
     def __init__(self, api_client, work_api=None, keep_client=None,
-                 output_name=None, output_tags=None, num_retries=4,
-                 parallel_submit_count=4):
+                 output_name=None, output_tags=None, num_retries=4):
         self.api = api_client
         self.processes = {}
-        self.in_flight = 0
         self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
@@ -86,9 +85,7 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
-        self.task_queue = Queue.Queue()
-        self.task_queue_threads = []
-        self.parallel_submit_count = parallel_submit_count
+        self.thread_count = 4
         self.poll_interval = 12
 
         if keep_client is not None:
@@ -144,29 +141,15 @@ class ArvCwlRunner(object):
                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
             self.final_status = processStatus
             self.final_output = out
+            self.workflow_eval_lock.notifyAll()
 
-    def task_queue_func(self):
-        while True:
-            task = self.task_queue.get()
-            if task is None:
-                return
-            task()
-
-    def task_queue_add(self, task):
-        if self.parallel_submit_count > 1:
-            self.task_queue.put(task)
-        else:
-            task()
 
     def start_run(self, runnable, kwargs):
-        with self.workflow_eval_lock:
-            self.in_flight += 1
-        self.task_queue_add(partial(runnable.run, **kwargs))
+        self.task_queue.add(partial(runnable.run, **kwargs))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
             self.processes[container.uuid] = container
-            self.in_flight -= 1
 
     def process_done(self, uuid):
         with self.workflow_eval_lock:
@@ -176,6 +159,7 @@ class ArvCwlRunner(object):
     def wrapped_callback(self, cb, obj, st):
         with self.workflow_eval_lock:
             cb(obj, st)
+            self.workflow_eval_lock.notifyAll()
 
     def get_wrapped_callback(self, cb):
         return partial(self.wrapped_callback, cb)
@@ -183,24 +167,19 @@ class ArvCwlRunner(object):
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
-                    uuid = event["object_uuid"]
+                uuid = event["object_uuid"]
+                if event["properties"]["new_attributes"]["state"] == "Running":
                     with self.workflow_eval_lock:
                         j = self.processes[uuid]
-                        logger.info("%s %s is Running", self.label(j), uuid)
-                        j.running = True
-                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                        if j.running is False:
+                            j.running = True
+                            j.update_pipeline_component(event["properties"]["new_attributes"])
+                            logger.info("%s %s is Running", self.label(j), uuid)
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                    uuid = event["object_uuid"]
                     with self.workflow_eval_lock:
                         j = self.processes[uuid]
-                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                    def done_cb():
-                        j.done(event["properties"]["new_attributes"])
-                        with self.workflow_eval_lock:
-                            self.workflow_eval_lock.notify()
-                    self.task_queue_add(done_cb)
-
+                    self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
+                    logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -251,7 +230,7 @@ class ArvCwlRunner(object):
             logger.exception("Fatal error in state polling thread.")
             with self.workflow_eval_lock:
                 self.processes.clear()
-                self.workflow_eval_lock.notify()
+                self.workflow_eval_lock.notifyAll()
         finally:
             self.stop_polling.set()
 
@@ -407,6 +386,7 @@ class ArvCwlRunner(object):
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
         self.secret_store = kwargs.get("secret_store")
+        self.thread_count = kwargs.get("thread_count", 4)
 
         self.trash_intermediate = kwargs["trash_intermediate"]
         if self.trash_intermediate and self.work_api != "containers":
@@ -528,17 +508,14 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run(wait=kwargs.get("wait"))
+            runnerjob.run(**kwargs)
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
-        for r in xrange(0, self.parallel_submit_count):
-            t = threading.Thread(target=self.task_queue_func)
-            self.task_queue_threads.append(t)
-            t.start()
+        self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
 
         if runnerjob:
             jobiter = iter((runnerjob,))
@@ -564,19 +541,24 @@ class ArvCwlRunner(object):
                 if self.stop_polling.is_set():
                     break
 
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+
                 if runnable:
                     with Perf(metrics, "run"):
                         self.start_run(runnable, kwargs)
                 else:
-                    if (self.in_flight + len(self.processes)) > 0:
+                    if (self.task_queue.in_flight + len(self.processes)) > 0:
                         self.workflow_eval_lock.wait(3)
                     else:
-                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
-            while (self.in_flight + len(self.processes)) > 0:
+            while (self.task_queue.in_flight + len(self.processes)) > 0:
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
                 self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
@@ -594,18 +576,10 @@ class ArvCwlRunner(object):
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.workflow_eval_lock.release()
-            try:
-                # Drain queue
-                while not self.task_queue.empty():
-                    self.task_queue.get()
-            except Queue.Empty:
-                pass
+            self.task_queue.drain()
             self.stop_polling.set()
             self.polling_thread.join()
-            for t in self.task_queue_threads:
-                self.task_queue.put(None)
-            for t in self.task_queue_threads:
-                t.join()
+            self.task_queue.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -755,8 +729,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_true", default=False,
                         help=argparse.SUPPRESS)
 
-    parser.add_argument("--parallel-submit-count", type=int,
-                        default=4, help="Submit requests in parallel (default 4)")
+    parser.add_argument("--thread-count", type=int,
+                        default=4, help="Number of threads to use for job submit and output collection.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",