class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
- self.task_queue = Queue.Queue()
+ self.task_queue = Queue.Queue(maxsize=self.thread_count)
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
with self.lock:
self.in_flight -= 1
- def add(self, task):
+ def add(self, task, unlock, check_done):
with self.lock:
if self.thread_count > 1:
self.in_flight += 1
- self.task_queue.put(task)
else:
task()
+ return
+
+ while True:
+ try:
+ unlock.release()
+ self.task_queue.put(task, block=True, timeout=3)
+ return
+ except Queue.Full:
+ if check_done.is_set():
+ return
+ finally:
+ unlock.acquire()
+
def drain(self):
try: