Merge branch '13076-r-autogen-api'
[arvados.git] / sdk / cwl / tests / test_tq.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import functools
6 import mock
7 import sys
8 import unittest
9 import json
10 import logging
11 import os
12 import threading
13
14 from arvados_cwl.task_queue import TaskQueue
15
16 def success_task():
17     pass
18
19 def fail_task():
20     raise Exception("Testing error handling")
21
22 class TestTaskQueue(unittest.TestCase):
23     def test_tq(self):
24         tq = TaskQueue(threading.Lock(), 2)
25
26         self.assertIsNone(tq.error)
27
28         tq.add(success_task)
29         tq.add(success_task)
30         tq.add(success_task)
31         tq.add(success_task)
32
33         tq.join()
34
35         self.assertIsNone(tq.error)
36
37
38     def test_tq_error(self):
39         tq = TaskQueue(threading.Lock(), 2)
40
41         self.assertIsNone(tq.error)
42
43         tq.add(success_task)
44         tq.add(success_task)
45         tq.add(fail_task)
46         tq.add(success_task)
47
48         tq.join()
49
50         self.assertIsNotNone(tq.error)