1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
13 from unittest import mock
15 from cwltool.task_queue import TaskQueue
21 raise Exception("Testing error handling")
23 class TestTaskQueue(unittest.TestCase):
25 tq = TaskQueue(threading.Lock(), 2)
27 self.assertIsNone(tq.error)
29 unlock = threading.Lock()
31 check_done = threading.Event()
33 tq.add(success_task, unlock, check_done)
34 tq.add(success_task, unlock, check_done)
35 tq.add(success_task, unlock, check_done)
36 tq.add(success_task, unlock, check_done)
40 self.assertIsNone(tq.error)
43 def test_tq_error(self):
44 tq = TaskQueue(threading.Lock(), 2)
46 self.assertIsNone(tq.error)
48 unlock = threading.Lock()
50 check_done = threading.Event()
52 tq.add(success_task, unlock, check_done)
53 tq.add(success_task, unlock, check_done)
54 tq.add(fail_task, unlock, check_done)
55 tq.add(success_task, unlock, check_done)
59 self.assertIsNotNone(tq.error)