X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/080c940d7a8134a6e277a53b7e45eb27e2b2c87f..cabf89d1fd8b40a2624d101a95c6587bfdd91fed:/sdk/cwl/arvados_cwl/task_queue.py diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py index b9fd09807b..d75fec6c63 100644 --- a/sdk/cwl/arvados_cwl/task_queue.py +++ b/sdk/cwl/arvados_cwl/task_queue.py @@ -2,7 +2,12 @@ # # SPDX-License-Identifier: Apache-2.0 -import Queue +from future import standard_library +standard_library.install_aliases() +from builtins import range +from builtins import object + +import queue import threading import logging @@ -11,46 +16,58 @@ logger = logging.getLogger('arvados.cwl-runner') class TaskQueue(object): def __init__(self, lock, thread_count): self.thread_count = thread_count - self.task_queue = Queue.Queue() + self.task_queue = queue.Queue(maxsize=self.thread_count) self.task_queue_threads = [] self.lock = lock self.in_flight = 0 self.error = None - for r in xrange(0, self.thread_count): + for r in range(0, self.thread_count): t = threading.Thread(target=self.task_queue_func) self.task_queue_threads.append(t) t.start() def task_queue_func(self): + while True: + task = self.task_queue.get() + if task is None: + return + try: + task() + except Exception as e: + logger.exception("Unhandled exception running task") + self.error = e - while True: - task = self.task_queue.get() - if task is None: - return - try: - task() - except Exception as e: - logger.exception("Unhandled exception running task") - self.error = e - - with self.lock: - self.in_flight -= 1 - - def add(self, task): - with self.lock: - if self.thread_count > 1: + with self.lock: + self.in_flight -= 1 + + def add(self, task, unlock, check_done): + if self.thread_count > 1: + with self.lock: self.in_flight += 1 - self.task_queue.put(task) - else: - task() + else: + task() + return + + while True: + try: + unlock.release() + if check_done.is_set(): + return + self.task_queue.put(task, block=True, timeout=3) + return + except queue.Full: + pass + finally: + unlock.acquire() + def drain(self): try: # Drain queue while not self.task_queue.empty(): self.task_queue.get(True, .1) - except Queue.Empty: + except queue.Empty: pass def join(self):