projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
14886: Simplifies test mocking
[arvados.git]
/
sdk
/
cwl
/
arvados_cwl
/
task_queue.py
diff --git
a/sdk/cwl/arvados_cwl/task_queue.py
b/sdk/cwl/arvados_cwl/task_queue.py
index b9fd09807b452c1b06738ef1a7df72fd9dcc8708..d75fec6c63e719949d6f19b7d2813d9f828262a6 100644
(file)
--- a/
sdk/cwl/arvados_cwl/task_queue.py
+++ b/
sdk/cwl/arvados_cwl/task_queue.py
@@
-2,7
+2,12
@@
#
# SPDX-License-Identifier: Apache-2.0
#
# SPDX-License-Identifier: Apache-2.0
-import Queue
+from future import standard_library
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
+
+import queue
import threading
import logging
import threading
import logging
@@
-11,46
+16,58
@@
logger = logging.getLogger('arvados.cwl-runner')
class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
- self.task_queue =
Queue.Queue(
)
+ self.task_queue =
queue.Queue(maxsize=self.thread_count
)
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
self.error = None
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
self.error = None
- for r in
x
range(0, self.thread_count):
+ for r in range(0, self.thread_count):
t = threading.Thread(target=self.task_queue_func)
self.task_queue_threads.append(t)
t.start()
def task_queue_func(self):
t = threading.Thread(target=self.task_queue_func)
self.task_queue_threads.append(t)
t.start()
def task_queue_func(self):
+ while True:
+ task = self.task_queue.get()
+ if task is None:
+ return
+ try:
+ task()
+ except Exception as e:
+ logger.exception("Unhandled exception running task")
+ self.error = e
- while True:
- task = self.task_queue.get()
- if task is None:
- return
- try:
- task()
- except Exception as e:
- logger.exception("Unhandled exception running task")
- self.error = e
-
- with self.lock:
- self.in_flight -= 1
-
- def add(self, task):
- with self.lock:
- if self.thread_count > 1:
+ with self.lock:
+ self.in_flight -= 1
+
+ def add(self, task, unlock, check_done):
+ if self.thread_count > 1:
+ with self.lock:
self.in_flight += 1
self.in_flight += 1
- self.task_queue.put(task)
- else:
- task()
+ else:
+ task()
+ return
+
+ while True:
+ try:
+ unlock.release()
+ if check_done.is_set():
+ return
+ self.task_queue.put(task, block=True, timeout=3)
+ return
+ except queue.Full:
+ pass
+ finally:
+ unlock.acquire()
+
def drain(self):
try:
# Drain queue
while not self.task_queue.empty():
self.task_queue.get(True, .1)
def drain(self):
try:
# Drain queue
while not self.task_queue.empty():
self.task_queue.get(True, .1)
- except
Q
ueue.Empty:
+ except
q
ueue.Empty:
pass
def join(self):
pass
def join(self):