Merge branch '19847-cwl-disk-cache-size' refs #19847
[arvados.git] / services / fuse / tests / integration_test.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 import arvados
7 import arvados_fuse
8 import arvados_fuse.command
9 import atexit
10 import functools
11 import inspect
12 import logging
13 import multiprocessing
14 import os
15 from . import run_test_server
16 import signal
17 import sys
18 import tempfile
19 import unittest
20
21 @atexit.register
22 def _pool_cleanup():
23     if _pool is None:
24         return
25     _pool.close()
26     _pool.join()
27
28
29 def wrap_static_test_method(modName, clsName, funcName, args, kwargs):
30     class Test(unittest.TestCase):
31         def runTest(self, *args, **kwargs):
32             getattr(getattr(sys.modules[modName], clsName), funcName)(self, *args, **kwargs)
33     Test().runTest(*args, **kwargs)
34
35
36 # To avoid Python's threading+multiprocessing=deadlock problems, we
37 # use a single global pool with maxtasksperchild=None for the entire
38 # test suite.
39 _pool = None
40 def workerPool():
41     global _pool
42     if _pool is None:
43         _pool = multiprocessing.Pool(processes=1, maxtasksperchild=None)
44     return _pool
45
46
47 class IntegrationTest(unittest.TestCase):
48     def pool_test(self, *args, **kwargs):
49         """Run a static method as a unit test, in a different process.
50
51         If called by method 'foobar', the static method '_foobar' of
52         the same class will be called in the other process.
53         """
54         modName = inspect.getmodule(self).__name__
55         clsName = self.__class__.__name__
56         funcName = inspect.currentframe().f_back.f_code.co_name
57         workerPool().apply(
58             wrap_static_test_method,
59             (modName, clsName, '_'+funcName, args, kwargs))
60
61     @classmethod
62     def setUpClass(cls):
63         run_test_server.run()
64         run_test_server.run_keep(blob_signing=True, num_servers=2)
65
66     @classmethod
67     def tearDownClass(cls):
68         run_test_server.stop_keep(num_servers=2)
69
70     def setUp(self):
71         self.mnt = tempfile.mkdtemp()
72         run_test_server.authorize_with('active')
73
74     def tearDown(self):
75         os.rmdir(self.mnt)
76         run_test_server.reset()
77
78     @staticmethod
79     def mount(argv):
80         """Decorator. Sets up a FUSE mount at self.mnt with the given args."""
81         def decorator(func):
82             @functools.wraps(func)
83             def wrapper(self, *args, **kwargs):
84                 self.mount = None
85                 try:
86                     with arvados_fuse.command.Mount(
87                             arvados_fuse.command.ArgumentParser().parse_args(
88                                 argv + ['--foreground',
89                                         '--unmount-timeout=2',
90                                         self.mnt])) as self.mount:
91                         return func(self, *args, **kwargs)
92                 finally:
93                     if self.mount and self.mount.llfuse_thread.is_alive():
94                         logging.warning("IntegrationTest.mount:"
95                                             " llfuse thread still alive after umount"
96                                             " -- killing test suite to avoid deadlock")
97                         os.kill(os.getpid(), signal.SIGKILL)
98             return wrapper
99         return decorator