3 import arvados_fuse.command
11 import run_test_server
13 def wrap_static_test_method(modName, clsName, funcName, args, kwargs):
14 class Test(unittest.TestCase):
15 def runTest(self, *args, **kwargs):
16 getattr(getattr(sys.modules[modName], clsName), funcName)(self, *args, **kwargs)
17 Test().runTest(*args, **kwargs)
20 class IntegrationTest(unittest.TestCase):
21 def pool_test(self, *args, **kwargs):
22 """Run a static method as a unit test, in a different process.
24 If called by method 'foobar', the static method '_foobar' of
25 the same class will be called in the other process.
27 modName = inspect.getmodule(self).__name__
28 clsName = self.__class__.__name__
29 funcName = inspect.currentframe().f_back.f_code.co_name
30 pool = multiprocessing.Pool(1)
33 wrap_static_test_method,
34 (modName, clsName, '_'+funcName, args, kwargs))
42 run_test_server.run_keep(enforce_permissions=True, num_servers=2)
45 def tearDownClass(cls):
46 run_test_server.stop_keep(num_servers=2)
49 self.mnt = tempfile.mkdtemp()
50 run_test_server.authorize_with('active')
51 self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
55 run_test_server.reset()
59 """Decorator. Sets up a FUSE mount at self.mnt with the given args."""
61 @functools.wraps(func)
62 def wrapper(self, *args, **kwargs):
63 with arvados_fuse.command.Mount(
64 arvados_fuse.command.ArgumentParser().parse_args(
65 argv + ['--foreground',
66 '--unmount-timeout=0.1',
68 return func(self, *args, **kwargs)