16417: Creates Loki's S3 bucket and access resources.
[arvados.git] / sdk / cwl / tests / test_tq.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import functools
6 import sys
7 import unittest
8 import json
9 import logging
10 import os
11 import threading
12
13 from unittest import mock
14
15 from cwltool.task_queue import TaskQueue
16
17 def success_task():
18     pass
19
20 def fail_task():
21     raise Exception("Testing error handling")
22
23 class TestTaskQueue(unittest.TestCase):
24     def test_tq(self):
25         tq = TaskQueue(threading.Lock(), 2)
26         try:
27             self.assertIsNone(tq.error)
28
29             unlock = threading.Lock()
30             unlock.acquire()
31             check_done = threading.Event()
32
33             tq.add(success_task, unlock, check_done)
34             tq.add(success_task, unlock, check_done)
35             tq.add(success_task, unlock, check_done)
36             tq.add(success_task, unlock, check_done)
37         finally:
38             tq.join()
39
40         self.assertIsNone(tq.error)
41
42
43     def test_tq_error(self):
44         tq = TaskQueue(threading.Lock(), 2)
45         try:
46             self.assertIsNone(tq.error)
47
48             unlock = threading.Lock()
49             unlock.acquire()
50             check_done = threading.Event()
51
52             tq.add(success_task, unlock, check_done)
53             tq.add(success_task, unlock, check_done)
54             tq.add(fail_task, unlock, check_done)
55             tq.add(success_task, unlock, check_done)
56         finally:
57             tq.join()
58
59         self.assertIsNotNone(tq.error)