1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
14 from arvados_cwl.task_queue import TaskQueue
20 raise Exception("Testing error handling")
22 class TestTaskQueue(unittest.TestCase):
24 tq = TaskQueue(threading.Lock(), 2)
26 self.assertIsNone(tq.error)
28 unlock = threading.Lock()
30 check_done = threading.Event()
32 tq.add(success_task, unlock, check_done)
33 tq.add(success_task, unlock, check_done)
34 tq.add(success_task, unlock, check_done)
35 tq.add(success_task, unlock, check_done)
39 self.assertIsNone(tq.error)
42 def test_tq_error(self):
43 tq = TaskQueue(threading.Lock(), 2)
45 self.assertIsNone(tq.error)
47 unlock = threading.Lock()
49 check_done = threading.Event()
51 tq.add(success_task, unlock, check_done)
52 tq.add(success_task, unlock, check_done)
53 tq.add(fail_task, unlock, check_done)
54 tq.add(success_task, unlock, check_done)
58 self.assertIsNotNone(tq.error)