3 import arvados_fuse.command
10 import run_test_server
28 def wrap_static_test_method(modName, clsName, funcName, args, kwargs):
29 class Test(unittest.TestCase):
30 def runTest(self, *args, **kwargs):
31 getattr(getattr(sys.modules[modName], clsName), funcName)(self, *args, **kwargs)
32 Test().runTest(*args, **kwargs)
35 class IntegrationTest(unittest.TestCase):
36 def pool_test(self, *args, **kwargs):
37 """Run a static method as a unit test, in a different process.
39 If called by method 'foobar', the static method '_foobar' of
40 the same class will be called in the other process.
44 _pool = multiprocessing.Pool(1, maxtasksperchild=1)
45 modName = inspect.getmodule(self).__name__
46 clsName = self.__class__.__name__
47 funcName = inspect.currentframe().f_back.f_code.co_name
49 wrap_static_test_method,
50 (modName, clsName, '_'+funcName, args, kwargs))
55 run_test_server.run_keep(enforce_permissions=True, num_servers=2)
58 def tearDownClass(cls):
59 run_test_server.stop_keep(num_servers=2)
62 self.mnt = tempfile.mkdtemp()
63 run_test_server.authorize_with('active')
64 self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
68 run_test_server.reset()
72 """Decorator. Sets up a FUSE mount at self.mnt with the given args."""
74 @functools.wraps(func)
75 def wrapper(self, *args, **kwargs):
78 with arvados_fuse.command.Mount(
79 arvados_fuse.command.ArgumentParser().parse_args(
80 argv + ['--foreground',
81 '--unmount-timeout=2',
82 self.mnt])) as self.mount:
83 return func(self, *args, **kwargs)
85 if self.mount and self.mount.llfuse_thread.is_alive():
86 logging.warning("IntegrationTest.mount:"
87 " llfuse thread still alive after umount"
88 " -- killing test suite to avoid deadlock")
89 os.kill(os.getpid(), signal.SIGKILL)