13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / cwl / arvados_cwl / task_queue.py
1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import range
4 from builtins import object
5 # Copyright (C) The Arvados Authors. All rights reserved.
6 #
7 # SPDX-License-Identifier: Apache-2.0
8
9 import queue
10 import threading
11 import logging
12
13 logger = logging.getLogger('arvados.cwl-runner')
14
15 class TaskQueue(object):
16     def __init__(self, lock, thread_count):
17         self.thread_count = thread_count
18         self.task_queue = queue.Queue(maxsize=self.thread_count)
19         self.task_queue_threads = []
20         self.lock = lock
21         self.in_flight = 0
22         self.error = None
23
24         for r in range(0, self.thread_count):
25             t = threading.Thread(target=self.task_queue_func)
26             self.task_queue_threads.append(t)
27             t.start()
28
29     def task_queue_func(self):
30         while True:
31             task = self.task_queue.get()
32             if task is None:
33                 return
34             try:
35                 task()
36             except Exception as e:
37                 logger.exception("Unhandled exception running task")
38                 self.error = e
39
40             with self.lock:
41                 self.in_flight -= 1
42
43     def add(self, task, unlock, check_done):
44         if self.thread_count > 1:
45             with self.lock:
46                 self.in_flight += 1
47         else:
48             task()
49             return
50
51         while True:
52             try:
53                 unlock.release()
54                 if check_done.is_set():
55                     return
56                 self.task_queue.put(task, block=True, timeout=3)
57                 return
58             except queue.Full:
59                 pass
60             finally:
61                 unlock.acquire()
62
63
64     def drain(self):
65         try:
66             # Drain queue
67             while not self.task_queue.empty():
68                 self.task_queue.get(True, .1)
69         except queue.Empty:
70             pass
71
72     def join(self):
73         for t in self.task_queue_threads:
74             self.task_queue.put(None)
75         for t in self.task_queue_threads:
76             t.join()