1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
9 logger = logging.getLogger('arvados.cwl-runner')
11 class TaskQueue(object):
12 def __init__(self, lock, thread_count):
13 self.thread_count = thread_count
14 self.task_queue = Queue.Queue(maxsize=self.thread_count)
15 self.task_queue_threads = []
20 for r in xrange(0, self.thread_count):
21 t = threading.Thread(target=self.task_queue_func)
22 self.task_queue_threads.append(t)
25 def task_queue_func(self):
27 task = self.task_queue.get()
32 except Exception as e:
33 logger.exception("Unhandled exception running task")
39 def add(self, task, unlock, check_done):
40 if self.thread_count > 1:
50 if check_done.is_set():
52 self.task_queue.put(task, block=True, timeout=3)
63 while not self.task_queue.empty():
64 self.task_queue.get(True, .1)
69 for t in self.task_queue_threads:
70 self.task_queue.put(None)
71 for t in self.task_queue_threads: