1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import arvados_fuse.command
13 import multiprocessing
15 import run_test_server
33 def wrap_static_test_method(modName, clsName, funcName, args, kwargs):
34 class Test(unittest.TestCase):
35 def runTest(self, *args, **kwargs):
36 getattr(getattr(sys.modules[modName], clsName), funcName)(self, *args, **kwargs)
37 Test().runTest(*args, **kwargs)
40 class IntegrationTest(unittest.TestCase):
41 def pool_test(self, *args, **kwargs):
42 """Run a static method as a unit test, in a different process.
44 If called by method 'foobar', the static method '_foobar' of
45 the same class will be called in the other process.
49 _pool = multiprocessing.Pool(1, maxtasksperchild=1)
50 modName = inspect.getmodule(self).__name__
51 clsName = self.__class__.__name__
52 funcName = inspect.currentframe().f_back.f_code.co_name
54 wrap_static_test_method,
55 (modName, clsName, '_'+funcName, args, kwargs))
60 run_test_server.run_keep(enforce_permissions=True, num_servers=2)
63 def tearDownClass(cls):
64 run_test_server.stop_keep(num_servers=2)
67 self.mnt = tempfile.mkdtemp()
68 run_test_server.authorize_with('active')
72 run_test_server.reset()
76 """Decorator. Sets up a FUSE mount at self.mnt with the given args."""
78 @functools.wraps(func)
79 def wrapper(self, *args, **kwargs):
80 # Workaround for llfuse deadlock bug. See #10805, #8345,
81 # https://bitbucket.org/nikratio/python-llfuse/issues/108
82 llfuse.close = lambda *args: None
86 with arvados_fuse.command.Mount(
87 arvados_fuse.command.ArgumentParser().parse_args(
88 argv + ['--foreground',
89 '--unmount-timeout=2',
90 self.mnt])) as self.mount:
91 return func(self, *args, **kwargs)
93 if self.mount and self.mount.llfuse_thread.is_alive():
94 logging.warning("IntegrationTest.mount:"
95 " llfuse thread still alive after umount"
96 " -- killing test suite to avoid deadlock")
97 os.kill(os.getpid(), signal.SIGKILL)