import tempfile
import unittest
-_pool = None
-
-
@atexit.register
def _pool_cleanup():
- global _pool
if _pool is None:
return
_pool.close()
Test().runTest(*args, **kwargs)
+# To avoid Python's threading+multiprocessing=deadlock problems, we
+# use a single global pool with maxtasksperchild=None for the entire
+# test suite.
+_pool = None
+def workerPool():
+ global _pool
+ if _pool is None:
+ _pool = multiprocessing.Pool(processes=1, maxtasksperchild=None)
+ return _pool
+
+
class IntegrationTest(unittest.TestCase):
def pool_test(self, *args, **kwargs):
"""Run a static method as a unit test, in a different process.
If called by method 'foobar', the static method '_foobar' of
the same class will be called in the other process.
"""
- global _pool
- if _pool is None:
- _pool = multiprocessing.Pool(1, maxtasksperchild=1)
modName = inspect.getmodule(self).__name__
clsName = self.__class__.__name__
funcName = inspect.currentframe().f_back.f_code.co_name
- _pool.apply(
+ workerPool().apply(
wrap_static_test_method,
(modName, clsName, '_'+funcName, args, kwargs))
logger = logging.getLogger('arvados.arv-mount')
+from .integration_test import workerPool
+
class MountTestBase(unittest.TestCase):
def setUp(self, api=None, local_store=True):
# The underlying C implementation of open() makes a fstat() syscall
# deadlocks. The workaround is to run some of our test code in a
# separate process. Forturnately the multiprocessing module makes this
# relatively easy.
- self.pool = multiprocessing.Pool(1)
+ self.pool = workerPool()
if local_store:
self.keeptmp = tempfile.mkdtemp()
os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
shutil.rmtree(self.keeptmp)
os.environ.pop('KEEP_LOCAL_STORE')
run_test_server.reset()
- self.pool.close()
- self.pool.join()
def assertDirContents(self, subdir, expect_content):
path = self.mounttmp
import tempfile
import unittest
+from .integration_test import workerPool
+
try:
from shlex import quote
except:
def setUp(self):
self.mnt = tempfile.mkdtemp()
_, self.okfile = tempfile.mkstemp()
- self.pool = multiprocessing.Pool(1)
def tearDown(self):
- self.pool.terminate()
- self.pool.join()
os.rmdir(self.mnt)
os.unlink(self.okfile)
def test_exec(self):
- self.pool.apply(try_exec, (self.mnt, [
+ workerPool().apply(try_exec, (self.mnt, [
'sh', '-c',
'echo -n foo >{}; cp {} {}'.format(
quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),