1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import range
8 from builtins import object
14 logger = logging.getLogger('arvados.cwl-runner')
16 class TaskQueue(object):
17 def __init__(self, lock, thread_count):
18 self.thread_count = thread_count
19 self.task_queue = queue.Queue(maxsize=self.thread_count)
20 self.task_queue_threads = []
25 for r in range(0, self.thread_count):
26 t = threading.Thread(target=self.task_queue_func)
27 self.task_queue_threads.append(t)
30 def task_queue_func(self):
32 task = self.task_queue.get()
37 except Exception as e:
38 logger.exception("Unhandled exception running task")
44 def add(self, task, unlock, check_done):
45 if self.thread_count > 1:
55 if check_done.is_set():
57 self.task_queue.put(task, block=True, timeout=3)
68 while not self.task_queue.empty():
69 self.task_queue.get(True, .1)
74 for t in self.task_queue_threads:
75 self.task_queue.put(None)
76 for t in self.task_queue_threads: