3 import arvados_fuse.command
27 def wrap_static_test_method(modName, clsName, funcName, args, kwargs):
28 class Test(unittest.TestCase):
29 def runTest(self, *args, **kwargs):
30 getattr(getattr(sys.modules[modName], clsName), funcName)(self, *args, **kwargs)
31 Test().runTest(*args, **kwargs)
34 class IntegrationTest(unittest.TestCase):
35 def pool_test(self, *args, **kwargs):
36 """Run a static method as a unit test, in a different process.
38 If called by method 'foobar', the static method '_foobar' of
39 the same class will be called in the other process.
43 _pool = multiprocessing.Pool(1, maxtasksperchild=1)
44 modName = inspect.getmodule(self).__name__
45 clsName = self.__class__.__name__
46 funcName = inspect.currentframe().f_back.f_code.co_name
48 wrap_static_test_method,
49 (modName, clsName, '_'+funcName, args, kwargs))
54 run_test_server.run_keep(enforce_permissions=True, num_servers=2)
57 def tearDownClass(cls):
58 run_test_server.stop_keep(num_servers=2)
61 self.mnt = tempfile.mkdtemp()
62 run_test_server.authorize_with('active')
63 self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
67 run_test_server.reset()
71 """Decorator. Sets up a FUSE mount at self.mnt with the given args."""
73 @functools.wraps(func)
74 def wrapper(self, *args, **kwargs):
75 with arvados_fuse.command.Mount(
76 arvados_fuse.command.ArgumentParser().parse_args(
77 argv + ['--foreground',
78 '--unmount-timeout=0.1',
80 return func(self, *args, **kwargs)