14510: Perfomance fixes
[arvados.git] / sdk / cwl / arvados_cwl / task_queue.py
index b9fd09807b452c1b06738ef1a7df72fd9dcc8708..018172b591b761b90825c9160b366b28a35d6b94 100644 (file)
@@ -11,7 +11,7 @@ logger = logging.getLogger('arvados.cwl-runner')
 class TaskQueue(object):
     def __init__(self, lock, thread_count):
         self.thread_count = thread_count
-        self.task_queue = Queue.Queue()
+        self.task_queue = Queue.Queue(maxsize=self.thread_count)
         self.task_queue_threads = []
         self.lock = lock
         self.in_flight = 0
@@ -37,13 +37,25 @@ class TaskQueue(object):
                 with self.lock:
                     self.in_flight -= 1
 
-    def add(self, task):
+    def add(self, task, unlock, check_done):
         with self.lock:
             if self.thread_count > 1:
                 self.in_flight += 1
-                self.task_queue.put(task)
             else:
                 task()
+                return
+
+        while True:
+            try:
+                unlock.release()
+                self.task_queue.put(task, block=True, timeout=3)
+                return
+            except Queue.Full:
+                if check_done.is_set():
+                    return
+            finally:
+                unlock.acquire()
+
 
     def drain(self):
         try: