projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
16480: Merge branch 'master'
[arvados.git]
/
sdk
/
cwl
/
arvados_cwl
/
task_queue.py
diff --git
a/sdk/cwl/arvados_cwl/task_queue.py
b/sdk/cwl/arvados_cwl/task_queue.py
index 018172b591b761b90825c9160b366b28a35d6b94..d75fec6c63e719949d6f19b7d2813d9f828262a6 100644
(file)
--- a/
sdk/cwl/arvados_cwl/task_queue.py
+++ b/
sdk/cwl/arvados_cwl/task_queue.py
@@
-2,7
+2,12
@@
#
# SPDX-License-Identifier: Apache-2.0
#
# SPDX-License-Identifier: Apache-2.0
-import Queue
+from future import standard_library
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
+
+import queue
import threading
import logging
import threading
import logging
@@
-11,48
+16,48
@@
logger = logging.getLogger('arvados.cwl-runner')
class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
- self.task_queue =
Q
ueue.Queue(maxsize=self.thread_count)
+ self.task_queue =
q
ueue.Queue(maxsize=self.thread_count)
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
self.error = None
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
self.error = None
- for r in
x
range(0, self.thread_count):
+ for r in range(0, self.thread_count):
t = threading.Thread(target=self.task_queue_func)
self.task_queue_threads.append(t)
t.start()
def task_queue_func(self):
t = threading.Thread(target=self.task_queue_func)
self.task_queue_threads.append(t)
t.start()
def task_queue_func(self):
+ while True:
+ task = self.task_queue.get()
+ if task is None:
+ return
+ try:
+ task()
+ except Exception as e:
+ logger.exception("Unhandled exception running task")
+ self.error = e
- while True:
- task = self.task_queue.get()
- if task is None:
- return
- try:
- task()
- except Exception as e:
- logger.exception("Unhandled exception running task")
- self.error = e
-
- with self.lock:
- self.in_flight -= 1
+ with self.lock:
+ self.in_flight -= 1
def add(self, task, unlock, check_done):
def add(self, task, unlock, check_done):
-
with self.lock
:
-
if self.thread_count > 1
:
+
if self.thread_count > 1
:
+
with self.lock
:
self.in_flight += 1
self.in_flight += 1
-
else:
-
task()
-
return
+ else:
+ task()
+ return
while True:
try:
unlock.release()
while True:
try:
unlock.release()
- self.task_queue.put(task, block=True, timeout=3)
- return
- except Queue.Full:
if check_done.is_set():
return
if check_done.is_set():
return
+ self.task_queue.put(task, block=True, timeout=3)
+ return
+ except queue.Full:
+ pass
finally:
unlock.acquire()
finally:
unlock.acquire()
@@
-62,7
+67,7
@@
class TaskQueue(object):
# Drain queue
while not self.task_queue.empty():
self.task_queue.get(True, .1)
# Drain queue
while not self.task_queue.empty():
self.task_queue.get(True, .1)
- except
Q
ueue.Empty:
+ except
q
ueue.Empty:
pass
def join(self):
pass
def join(self):