12134: Minimize multiprocessing worker startups.
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 18 Aug 2017 19:43:49 +0000 (15:43 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 29 Aug 2017 13:29:29 +0000 (09:29 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

services/fuse/tests/integration_test.py
services/fuse/tests/mount_test_base.py
services/fuse/tests/test_exec.py

index 6a0c5de8992b856f4692376acb38d634f2250791..3c11fa2920654edeb1e3412dee60e350f23c64dd 100644 (file)
@@ -17,12 +17,8 @@ import sys
 import tempfile
 import unittest
 
-_pool = None
-
-
 @atexit.register
 def _pool_cleanup():
-    global _pool
     if _pool is None:
         return
     _pool.close()
@@ -36,6 +32,17 @@ def wrap_static_test_method(modName, clsName, funcName, args, kwargs):
     Test().runTest(*args, **kwargs)
 
 
+# To avoid Python's threading+multiprocessing=deadlock problems, we
+# use a single global pool with maxtasksperchild=None for the entire
+# test suite.
+_pool = None
+def workerPool():
+    global _pool
+    if _pool is None:
+        _pool = multiprocessing.Pool(processes=1, maxtasksperchild=None)
+    return _pool
+
+
 class IntegrationTest(unittest.TestCase):
     def pool_test(self, *args, **kwargs):
         """Run a static method as a unit test, in a different process.
@@ -43,13 +50,10 @@ class IntegrationTest(unittest.TestCase):
         If called by method 'foobar', the static method '_foobar' of
         the same class will be called in the other process.
         """
-        global _pool
-        if _pool is None:
-            _pool = multiprocessing.Pool(1, maxtasksperchild=1)
         modName = inspect.getmodule(self).__name__
         clsName = self.__class__.__name__
         funcName = inspect.currentframe().f_back.f_code.co_name
-        _pool.apply(
+        workerPool().apply(
             wrap_static_test_method,
             (modName, clsName, '_'+funcName, args, kwargs))
 
index 8518d8bbead64b34b0ebfcb8308e18bdc6c6c6d7..d476fc771b4c0d5ad890d902cd41810f62f6fe3c 100644 (file)
@@ -21,6 +21,8 @@ import unittest
 
 logger = logging.getLogger('arvados.arv-mount')
 
+from .integration_test import workerPool
+
 class MountTestBase(unittest.TestCase):
     def setUp(self, api=None, local_store=True):
         # The underlying C implementation of open() makes a fstat() syscall
@@ -30,8 +32,8 @@ class MountTestBase(unittest.TestCase):
         # deadlocks.  The workaround is to run some of our test code in a
         # separate process.  Forturnately the multiprocessing module makes this
         # relatively easy.
-        self.pool = multiprocessing.Pool(1)
 
+        self.pool = workerPool()
         if local_store:
             self.keeptmp = tempfile.mkdtemp()
             os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
@@ -89,8 +91,6 @@ class MountTestBase(unittest.TestCase):
             shutil.rmtree(self.keeptmp)
             os.environ.pop('KEEP_LOCAL_STORE')
         run_test_server.reset()
-        self.pool.close()
-        self.pool.join()
 
     def assertDirContents(self, subdir, expect_content):
         path = self.mounttmp
index 85cb5e46989f3133feee187ebf2d9114401ddf55..ab6e13136b02fc45331106441272ee2c2ba6f982 100644 (file)
@@ -10,6 +10,8 @@ import run_test_server
 import tempfile
 import unittest
 
+from .integration_test import workerPool
+
 try:
     from shlex import quote
 except:
@@ -44,16 +46,13 @@ class ExecMode(unittest.TestCase):
     def setUp(self):
         self.mnt = tempfile.mkdtemp()
         _, self.okfile = tempfile.mkstemp()
-        self.pool = multiprocessing.Pool(1)
 
     def tearDown(self):
-        self.pool.terminate()
-        self.pool.join()
         os.rmdir(self.mnt)
         os.unlink(self.okfile)
 
     def test_exec(self):
-        self.pool.apply(try_exec, (self.mnt, [
+        workerPool().apply(try_exec, (self.mnt, [
             'sh', '-c',
             'echo -n foo >{}; cp {} {}'.format(
                 quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),