import os
import threading
-from arvados_cwl.task_queue import TaskQueue
+from cwltool.task_queue import TaskQueue
def success_task():
pass
class TestTaskQueue(unittest.TestCase):
def test_tq(self):
tq = TaskQueue(threading.Lock(), 2)
+ try:
+ self.assertIsNone(tq.error)
- self.assertIsNone(tq.error)
-
- tq.add(success_task)
- tq.add(success_task)
- tq.add(success_task)
- tq.add(success_task)
+ unlock = threading.Lock()
+ unlock.acquire()
+ check_done = threading.Event()
- tq.join()
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ finally:
+ tq.join()
self.assertIsNone(tq.error)
def test_tq_error(self):
tq = TaskQueue(threading.Lock(), 2)
-
- self.assertIsNone(tq.error)
-
- tq.add(success_task)
- tq.add(success_task)
- tq.add(fail_task)
- tq.add(success_task)
-
- tq.join()
+ try:
+ self.assertIsNone(tq.error)
+
+ unlock = threading.Lock()
+ unlock.acquire()
+ check_done = threading.Event()
+
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ tq.add(fail_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ finally:
+ tq.join()
self.assertIsNotNone(tq.error)