1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import range
4 from builtins import object
5 # Copyright (C) The Arvados Authors. All rights reserved.
7 # SPDX-License-Identifier: Apache-2.0
13 logger = logging.getLogger('arvados.cwl-runner')
15 class TaskQueue(object):
16 def __init__(self, lock, thread_count):
17 self.thread_count = thread_count
18 self.task_queue = queue.Queue(maxsize=self.thread_count)
19 self.task_queue_threads = []
24 for r in range(0, self.thread_count):
25 t = threading.Thread(target=self.task_queue_func)
26 self.task_queue_threads.append(t)
29 def task_queue_func(self):
31 task = self.task_queue.get()
36 except Exception as e:
37 logger.exception("Unhandled exception running task")
43 def add(self, task, unlock, check_done):
44 if self.thread_count > 1:
54 if check_done.is_set():
56 self.task_queue.put(task, block=True, timeout=3)
67 while not self.task_queue.empty():
68 self.task_queue.get(True, .1)
73 for t in self.task_queue_threads:
74 self.task_queue.put(None)
75 for t in self.task_queue_threads: