Merge branch '17449-gxp-docs' refs #17449
[arvados.git] / sdk / cwl / tests / test_tq.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import functools
6 import mock
7 import sys
8 import unittest
9 import json
10 import logging
11 import os
12 import threading
13
14 from cwltool.task_queue import TaskQueue
15
16 def success_task():
17     pass
18
19 def fail_task():
20     raise Exception("Testing error handling")
21
22 class TestTaskQueue(unittest.TestCase):
23     def test_tq(self):
24         tq = TaskQueue(threading.Lock(), 2)
25         try:
26             self.assertIsNone(tq.error)
27
28             unlock = threading.Lock()
29             unlock.acquire()
30             check_done = threading.Event()
31
32             tq.add(success_task, unlock, check_done)
33             tq.add(success_task, unlock, check_done)
34             tq.add(success_task, unlock, check_done)
35             tq.add(success_task, unlock, check_done)
36         finally:
37             tq.join()
38
39         self.assertIsNone(tq.error)
40
41
42     def test_tq_error(self):
43         tq = TaskQueue(threading.Lock(), 2)
44         try:
45             self.assertIsNone(tq.error)
46
47             unlock = threading.Lock()
48             unlock.acquire()
49             check_done = threading.Event()
50
51             tq.add(success_task, unlock, check_done)
52             tq.add(success_task, unlock, check_done)
53             tq.add(fail_task, unlock, check_done)
54             tq.add(success_task, unlock, check_done)
55         finally:
56             tq.join()
57
58         self.assertIsNotNone(tq.error)