3198: Serialize updates to avoid doing redundant API server calls.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 13 May 2015 19:36:42 +0000 (15:36 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 13 May 2015 19:36:42 +0000 (15:36 -0400)
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py
services/fuse/tests/test_mount.py

index d5350f431a391f10942b50dd654918fe4539b264..4dfa17409835132d3316e189497fb215c935496e 100644 (file)
@@ -80,18 +80,19 @@ class InodeCache(object):
 
     def _remove(self, obj, clear):
         if clear and not obj.clear():
-            _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
+            _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
             return False
         self._total -= obj.cache_size
         del self._entries[obj.cache_priority]
         if obj.cache_uuid:
             del self._by_uuid[obj.cache_uuid]
             obj.cache_uuid = None
-        _logger.debug("Cleared %s total now %i", obj, self._total)
+        if clear:
+            _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
         return True
 
     def cap_cache(self):
-        _logger.debug("total is %i cap is %i", self._total, self.cap)
+        #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
         if self._total > self.cap:
             for key in list(self._entries.keys()):
                 if self._total < self.cap or len(self._entries) < self.min_entries:
@@ -107,7 +108,7 @@ class InodeCache(object):
             if obj.cache_uuid:
                 self._by_uuid[obj.cache_uuid] = obj
             self._total += obj.objsize()
-            _logger.debug("Managing %s total now %i", obj, self._total)
+            _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
             self.cap_cache()
         else:
             obj.cache_priority = None
@@ -117,7 +118,6 @@ class InodeCache(object):
             if obj.cache_priority in self._entries:
                 self._remove(obj, False)
             self.manage(obj)
-            _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
 
     def unmanage(self, obj):
         if obj.persisted() and obj.cache_priority in self._entries:
@@ -464,9 +464,6 @@ class Operations(llfuse.Operations):
         if not p.writable():
             raise llfuse.FUSEError(errno.EPERM)
 
-        if not isinstance(p, CollectionDirectoryBase):
-            raise llfuse.FUSEError(errno.EPERM)
-
         return p
 
     @catch_exceptions
@@ -474,7 +471,7 @@ class Operations(llfuse.Operations):
         p = self._check_writable(inode_parent)
 
         with llfuse.lock_released:
-            p.collection.open(name, "w")
+            p.create(name)
 
         # The file entry should have been implicitly created by callback.
         f = p[name]
@@ -492,7 +489,7 @@ class Operations(llfuse.Operations):
         p = self._check_writable(inode_parent)
 
         with llfuse.lock_released:
-            p.collection.mkdirs(name)
+            p.mkdir(name)
 
         # The dir entry should have been implicitly created by callback.
         d = p[name]
@@ -506,10 +503,15 @@ class Operations(llfuse.Operations):
         p = self._check_writable(inode_parent)
 
         with llfuse.lock_released:
-            p.collection.remove(name)
+            p.unlink(name)
 
+    @catch_exceptions
     def rmdir(self, inode_parent, name):
-        self.unlink(inode_parent, name)
+        _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
+        p = self._check_writable(inode_parent)
+
+        with llfuse.lock_released:
+            p.rmdir(name)
 
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
@@ -518,9 +520,7 @@ class Operations(llfuse.Operations):
         dest = self._check_writable(inode_parent_new)
 
         with llfuse.lock_released:
-            dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
-            dest.flush()
-            src.flush()
+            dest.rename(name_old, name_new, src)
 
     @catch_exceptions
     def flush(self, fh):
index 16e5325264641255e07ea771e8cfa2b6af619e6d..b44950f25adff307e76f0fb2e301af1cb12c48f4 100644 (file)
@@ -5,6 +5,7 @@ import llfuse
 import arvados
 import apiclient
 import functools
+import threading
 
 from fusefile import StringFile, ObjectFile, FuseArvadosFile
 from fresh import FreshBase, convertTime, use_counter
@@ -72,7 +73,7 @@ class Directory(FreshBase):
             try:
                 self.update()
             except apiclient.errors.HttpError as e:
-                _logger.debug(e)
+                _logger.warn(e)
 
     @use_counter
     def __getitem__(self, item):
@@ -125,6 +126,7 @@ class Directory(FreshBase):
                     self._entries[name] = oldentries[name]
                     del oldentries[name]
                 else:
+                    _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
                     # create new directory entry
                     ent = new_entry(i)
                     if ent is not None:
@@ -133,6 +135,7 @@ class Directory(FreshBase):
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
+            _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
             llfuse.invalidate_entry(self.inode, str(i))
             self.inodes.del_entry(oldentries[i])
             changed = True
@@ -171,6 +174,21 @@ class Directory(FreshBase):
     def flush(self):
         pass
 
+    def create(self):
+        raise NotImplementedError()
+
+    def mkdir(self):
+        raise NotImplementedError()
+
+    def unlink(self):
+        raise NotImplementedError()
+
+    def rmdir(self):
+        raise NotImplementedError()
+
+    def rename(self):
+        raise NotImplementedError()
+
 class CollectionDirectoryBase(Directory):
     def __init__(self, parent_inode, inodes, collection):
         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
@@ -221,6 +239,26 @@ class CollectionDirectoryBase(Directory):
     def flush(self):
         self.collection.root_collection().save()
 
+    def create(self, name):
+        self.collection.open(name, "w").close()
+
+    def mkdir(self, name):
+        self.collection.mkdirs(name)
+
+    def unlink(self, name):
+        self.collection.remove(name)
+
+    def rmdir(self, name):
+        self.collection.remove(name)
+
+    def rename(self, name_old, name_new, src):
+        if not isinstance(src, CollectionDirectoryBase):
+            raise llfuse.FUSEError(errno.EPERM)
+        self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+        self.flush()
+        src.flush()
+
+
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree holding a collection."""
 
@@ -239,6 +277,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         self._manifest_size = 0
         if self.collection_locator:
             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
+        self._updating_lock = threading.Lock()
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
@@ -284,38 +323,45 @@ class CollectionDirectory(CollectionDirectoryBase):
                 self.fresh()
                 return True
 
-            with llfuse.lock_released:
-                _logger.debug("Updating %s", self.collection_locator)
-                if self.collection:
-                    self.collection.update()
-                else:
-                    if uuid_pattern.match(self.collection_locator):
-                        coll_reader = arvados.collection.Collection(
-                            self.collection_locator, self.api, self.api.keep,
-                            num_retries=self.num_retries)
+            try:
+                with llfuse.lock_released:
+                    self._updating_lock.acquire()
+                    if not self.stale():
+                        return
+
+                    _logger.debug("Updating %s", self.collection_locator)
+                    if self.collection:
+                        self.collection.update()
                     else:
-                        coll_reader = arvados.collection.CollectionReader(
-                            self.collection_locator, self.api, self.api.keep,
-                            num_retries=self.num_retries)
-                    new_collection_record = coll_reader.api_response() or {}
-                    # If the Collection only exists in Keep, there will be no API
-                    # response.  Fill in the fields we need.
-                    if 'uuid' not in new_collection_record:
-                        new_collection_record['uuid'] = self.collection_locator
-                    if "portable_data_hash" not in new_collection_record:
-                        new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
-                    if 'manifest_text' not in new_collection_record:
-                        new_collection_record['manifest_text'] = coll_reader.manifest_text()
-
-                    if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
-                        self.new_collection(new_collection_record, coll_reader)
-
-                    self._manifest_size = len(coll_reader.manifest_text())
-                    _logger.debug("%s manifest_size %i", self, self._manifest_size)
-            # end with llfuse.lock_released, re-acquire lock
+                        if uuid_pattern.match(self.collection_locator):
+                            coll_reader = arvados.collection.Collection(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        else:
+                            coll_reader = arvados.collection.CollectionReader(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        new_collection_record = coll_reader.api_response() or {}
+                        # If the Collection only exists in Keep, there will be no API
+                        # response.  Fill in the fields we need.
+                        if 'uuid' not in new_collection_record:
+                            new_collection_record['uuid'] = self.collection_locator
+                        if "portable_data_hash" not in new_collection_record:
+                            new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+                        if 'manifest_text' not in new_collection_record:
+                            new_collection_record['manifest_text'] = coll_reader.manifest_text()
+
+                        if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
+                            self.new_collection(new_collection_record, coll_reader)
+
+                        self._manifest_size = len(coll_reader.manifest_text())
+                        _logger.debug("%s manifest_size %i", self, self._manifest_size)
+                # end with llfuse.lock_released, re-acquire lock
 
-            self.fresh()
-            return True
+                self.fresh()
+                return True
+            finally:
+                self._updating_lock.release()
         except arvados.errors.NotFoundError:
             _logger.exception("arv-mount %s: error", self.collection_locator)
         except arvados.errors.ArgumentError as detail:
@@ -498,9 +544,10 @@ class ProjectDirectory(Directory):
         self.num_retries = num_retries
         self.project_object = project_object
         self.project_object_file = None
-        self.uuid = project_object['uuid']
+        self.project_uuid = project_object['uuid']
         self._poll = poll
         self._poll_time = poll_time
+        self._updating_lock = threading.Lock()
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
@@ -518,7 +565,7 @@ class ProjectDirectory(Directory):
             return None
 
     def uuid(self):
-        return self.uuid
+        return self.project_uuid
 
     def update(self):
         if self.project_object_file == None:
@@ -542,31 +589,36 @@ class ProjectDirectory(Directory):
                 return None
 
         def samefn(a, i):
-            if isinstance(a, CollectionDirectory):
-                return a.collection_locator == i['uuid']
-            elif isinstance(a, ProjectDirectory):
-                return a.uuid == i['uuid']
+            if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
+                return a.uuid() == i['uuid']
             elif isinstance(a, ObjectFile):
-                return a.uuid == i['uuid'] and not a.stale()
+                return a.uuid() == i['uuid'] and not a.stale()
             return False
 
-        with llfuse.lock_released:
-            if group_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.groups().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
-            elif user_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.users().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
+        try:
+            with llfuse.lock_released:
+                self._updating_lock.acquire()
+                if not self.stale():
+                    return
 
-            contents = arvados.util.list_all(self.api.groups().contents,
-                                             self.num_retries, uuid=self.uuid)
+                if group_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.groups().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
+                elif user_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.users().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-        # end with llfuse.lock_released, re-acquire lock
+                contents = arvados.util.list_all(self.api.groups().contents,
+                                                 self.num_retries, uuid=self.project_uuid)
+
+            # end with llfuse.lock_released, re-acquire lock
 
-        self.merge(contents,
-                   namefn,
-                   samefn,
-                   self.createDirectory)
+            self.merge(contents,
+                       namefn,
+                       samefn,
+                       self.createDirectory)
+        finally:
+            self._updating_lock.release()
 
     def __getitem__(self, item):
         self.checkupdate()
@@ -582,7 +634,7 @@ class ProjectDirectory(Directory):
             return super(ProjectDirectory, self).__contains__(k)
 
     def persisted(self):
-        return False
+        return True
 
 
 class SharedDirectory(Directory):
@@ -632,11 +684,14 @@ class SharedDirectory(Directory):
             for r in root_owners:
                 if r in objects:
                     obr = objects[r]
-                    if "name" in obr:
+                    if obr.get("name"):
                         contents[obr["name"]] = obr
-                    if "first_name" in obr:
+                    #elif obr.get("username"):
+                    #    contents[obr["username"]] = obr
+                    elif "first_name" in obr:
                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
 
+
             for r in roots:
                 if r['owner_uuid'] not in objects:
                     contents[r['name']] = r
@@ -646,7 +701,7 @@ class SharedDirectory(Directory):
         try:
             self.merge(contents.items(),
                        lambda i: i[0],
-                       lambda a, i: a.uuid == i[1]['uuid'],
+                       lambda a, i: a.uuid() == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()
index d3c13f3a16a227a6dd063bfe597161757eb936ab..846e043c79c49f681451d201a99b6ea7e15eba67 100644 (file)
@@ -81,9 +81,12 @@ class ObjectFile(StringFile):
 
     def __init__(self, parent_inode, obj):
         super(ObjectFile, self).__init__(parent_inode, "", 0)
-        self.uuid = obj['uuid']
+        self.object_uuid = obj['uuid']
         self.update(obj)
 
+    def uuid(self):
+        return self.object_uuid
+
     def update(self, obj):
         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
index 7bc56b3677e4bc7bb3dfa26da425f5b1b3b74c28..540f211ba0fb737486f5dd90b8cbf3f671dcd4e6 100644 (file)
@@ -26,6 +26,7 @@ class MountTestBase(unittest.TestCase):
         run_test_server.run()
         run_test_server.authorize_with("admin")
         self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+        self.pool = multiprocessing.Pool(1)
 
     def make_mount(self, root_class, **root_kwargs):
         self.operations = fuse.Operations(os.getuid(), os.getgid())
@@ -50,6 +51,7 @@ class MountTestBase(unittest.TestCase):
         os.rmdir(self.mounttmp)
         shutil.rmtree(self.keeptmp)
         run_test_server.reset()
+        self.pool.close()
 
     def assertDirContents(self, subdir, expect_content):
         path = self.mounttmp
@@ -240,12 +242,12 @@ class FuseSharedTest(MountTestBase):
         # to the current user)
         shared_dirs = llfuse.listdir(self.mounttmp)
         shared_dirs.sort()
-        self.assertIn('FUSE User', shared_dirs)
+        self.assertIn('FUSE', shared_dirs)
 
         # fuse_user_objs is a list of the objects owned by the FUSE
         # test user (which present as files in the 'FUSE User'
         # directory)
-        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
+        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE'))
         fuse_user_objs.sort()
         self.assertEqual(['FUSE Test Project',                    # project owned by user
                           'collection #1 owned by FUSE',          # collection owned by user
@@ -254,7 +256,7 @@ class FuseSharedTest(MountTestBase):
                       ], fuse_user_objs)
 
         # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
+        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE', 'FUSE Test Project'))
         test_proj_files.sort()
         self.assertEqual(['collection in FUSE project',
                           'pipeline instance in FUSE project.pipelineInstance',
@@ -265,7 +267,7 @@ class FuseSharedTest(MountTestBase):
         # and that its contents are what we expect.
         pipeline_template_path = os.path.join(
                 self.mounttmp,
-                'FUSE User',
+                'FUSE',
                 'FUSE Test Project',
                 'pipeline template in FUSE project.pipelineTemplate')
         with open(pipeline_template_path) as f:
@@ -279,7 +281,7 @@ class FuseSharedTest(MountTestBase):
         # check mtime on collection
         st = os.stat(os.path.join(
                 self.mounttmp,
-                'FUSE User',
+                'FUSE',
                 'collection #1 owned by FUSE'))
         self.assertEqual(st.st_mtime, 1391448174)
 
@@ -315,7 +317,26 @@ class FuseHomeTest(MountTestBase):
         d3 = llfuse.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
         self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)
 
-class FuseUpdateFileTest(MountTestBase):
+
+def fuseModifyFileTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            d1 = llfuse.listdir(mounttmp)
+            self.assertEqual(["file1.txt"], d1)
+            with open(os.path.join(mounttmp, "file1.txt")) as f:
+                self.assertEqual("blub", f.read())
+    Test().runTest()
+
+def fuseModifyFileTestHelper2(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            d1 = llfuse.listdir(mounttmp)
+            self.assertEqual(["file1.txt"], d1)
+            with open(os.path.join(mounttmp, "file1.txt")) as f:
+                self.assertEqual("plnp", f.read())
+    Test().runTest()
+
+class FuseModifyFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
         with collection.open("file1.txt", "w") as f:
@@ -327,18 +348,12 @@ class FuseUpdateFileTest(MountTestBase):
         with llfuse.lock:
             m.new_collection(collection.api_response(), collection)
 
-        d1 = llfuse.listdir(self.mounttmp)
-        self.assertEqual(["file1.txt"], d1)
-        with open(os.path.join(self.mounttmp, "file1.txt")) as f:
-            self.assertEqual("blub", f.read())
+        self.pool.apply(fuseModifyFileTestHelper1, (self.mounttmp,))
 
         with collection.open("file1.txt", "w") as f:
             f.write("plnp")
 
-        d1 = llfuse.listdir(self.mounttmp)
-        self.assertEqual(["file1.txt"], d1)
-        with open(os.path.join(self.mounttmp, "file1.txt")) as f:
-            self.assertEqual("plnp", f.read())
+        self.pool.apply(fuseModifyFileTestHelper2, (self.mounttmp,))
 
 class FuseAddFileToCollectionTest(MountTestBase):
     def runTest(self):
@@ -413,7 +428,14 @@ class FuseCreateFileTest(MountTestBase):
         self.assertRegexpMatches(collection2["manifest_text"],
             r'\. d41d8cd98f00b204e9800998ecf8427e\+0\+A[a-f0-9]{40}@[a-f0-9]{8} 0:0:file1\.txt$')
 
-def fuseWriteFileTestHelper(mounttmp):
+def fuseWriteFileTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+                f.write("Hello world!")
+    Test().runTest()
+
+def fuseWriteFileTestHelper2(mounttmp):
     class Test(unittest.TestCase):
         def runTest(self):
             with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
@@ -422,8 +444,6 @@ def fuseWriteFileTestHelper(mounttmp):
 
 class FuseWriteFileTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -434,12 +454,6 @@ class FuseWriteFileTest(MountTestBase):
 
         self.assertNotIn("file1.txt", collection)
 
-        with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
-            f.write("Hello world!")
-
-        with collection.open("file1.txt") as f:
-            self.assertEqual(f.read(), "Hello world!")
-
         # We can't just open the collection for reading because the underlying
         # C implementation of open() makes a fstat() syscall with the GIL still
         # held.  When the GETATTR message comes back to llfuse (which in these
@@ -447,17 +461,23 @@ class FuseWriteFileTest(MountTestBase):
         # so it can't service the fstat() call, so it deadlocks.  The
         # workaround is to run some of our test code in a separate process.
         # Forturnately the multiprocessing module makes this relatively easy.
-        pool = multiprocessing.Pool(1)
-        pool.apply(fuseWriteFileTestHelper, (self.mounttmp,))
-        pool.close()
+        self.pool.apply(fuseWriteFileTestHelper1, (self.mounttmp,))
+
+        with collection.open("file1.txt") as f:
+            self.assertEqual(f.read(), "Hello world!")
+
+        self.pool.apply(fuseWriteFileTestHelper2, (self.mounttmp,))
 
         collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
         self.assertRegexpMatches(collection2["manifest_text"],
             r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
 
-def fuseUpdateFileTestHelper1(mounttmp):
+def fuseUpdateFileTestHelper(mounttmp):
     class Test(unittest.TestCase):
         def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
             with open(os.path.join(mounttmp, "file1.txt"), "r+") as f:
                 fr = f.read()
                 self.assertEqual(fr, "Hello world!")
@@ -466,20 +486,14 @@ def fuseUpdateFileTestHelper1(mounttmp):
                 f.seek(0)
                 fr = f.read()
                 self.assertEqual(fr, "Hola mundo!!")
-                return True
-    Test().runTest()
 
-def fuseUpdateFileTestHelper2(mounttmp):
-    class Test(unittest.TestCase):
-        def runTest(self):
             with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
-                return f.read() == "Hola mundo!!"
+                self.assertEqual(f.read(), "Hola mundo!!")
+
     Test().runTest()
 
 class FuseUpdateFileTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -488,14 +502,8 @@ class FuseUpdateFileTest(MountTestBase):
             m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
-        with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
-            f.write("Hello world!")
-
         # See note in FuseWriteFileTest
-        pool = multiprocessing.Pool(1)
-        pool.apply(fuseUpdateFileTestHelper1, (self.mounttmp,))
-        pool.apply(fuseUpdateFileTestHelper2, (self.mounttmp,))
-        pool.close()
+        self.pool.apply(fuseUpdateFileTestHelper, (self.mounttmp,))
 
         collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
         self.assertRegexpMatches(collection2["manifest_text"],
@@ -504,8 +512,6 @@ class FuseUpdateFileTest(MountTestBase):
 
 class FuseMkdirTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -539,8 +545,6 @@ class FuseMkdirTest(MountTestBase):
 
 class FuseRmTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -599,8 +603,6 @@ class FuseRmTest(MountTestBase):
 
 class FuseMvFileTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -636,10 +638,18 @@ class FuseMvFileTest(MountTestBase):
             r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
 
 
+def fuseRenameTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.mkdir(os.path.join(mounttmp, "testdir"))
+
+            with open(os.path.join(mounttmp, "testdir", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+    Test().runTest()
+
 class FuseRenameTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -648,10 +658,7 @@ class FuseRenameTest(MountTestBase):
             m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
-        os.mkdir(os.path.join(self.mounttmp, "testdir"))
-
-        with open(os.path.join(self.mounttmp, "testdir", "file1.txt"), "w") as f:
-            f.write("Hello world!")
+        self.pool.apply(fuseRenameTestHelper, (self.mounttmp,))
 
         # Starting manifest
         collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
@@ -677,8 +684,6 @@ class FuseRenameTest(MountTestBase):
 
 class FuseUpdateFromEventTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -722,8 +727,6 @@ def fuseFileConflictTestHelper(mounttmp):
 
 class FuseFileConflictTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -742,9 +745,7 @@ class FuseFileConflictTest(MountTestBase):
             f.write("bar")
 
         # See comment in FuseWriteFileTest
-        pool = multiprocessing.Pool(1)
-        pool.apply(fuseFileConflictTestHelper, (self.mounttmp,))
-        pool.close()
+        self.pool.apply(fuseFileConflictTestHelper, (self.mounttmp,))
 
 
 def fuseUnlinkOpenFileTest(mounttmp):
@@ -772,8 +773,6 @@ def fuseUnlinkOpenFileTest(mounttmp):
 
 class FuseUnlinkOpenFileTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection = arvados.collection.Collection(api_client=self.api)
         collection.save_new()
 
@@ -782,9 +781,7 @@ class FuseUnlinkOpenFileTest(MountTestBase):
             m.new_collection(collection.api_response(), collection)
 
         # See comment in FuseWriteFileTest
-        pool = multiprocessing.Pool(1)
-        pool.apply(fuseUnlinkOpenFileTest, (self.mounttmp,))
-        pool.close()
+        self.pool.apply(fuseUnlinkOpenFileTest, (self.mounttmp,))
 
         self.assertEqual(collection.manifest_text(), "")
 
@@ -816,8 +813,6 @@ def fuseMvFileBetweenCollectionsTest2(mounttmp, uuid1, uuid2):
 
 class FuseMvFileBetweenCollectionsTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection1 = arvados.collection.Collection(api_client=self.api)
         collection1.save_new()
 
@@ -827,8 +822,7 @@ class FuseMvFileBetweenCollectionsTest(MountTestBase):
         m = self.make_mount(fuse.MagicDirectory)
 
         # See comment in FuseWriteFileTest
-        pool = multiprocessing.Pool(1)
-        pool.apply(fuseMvFileBetweenCollectionsTest1, (self.mounttmp,
+        self.pool.apply(fuseMvFileBetweenCollectionsTest1, (self.mounttmp,
                                                   collection1.manifest_locator(),
                                                   collection2.manifest_locator()))
 
@@ -838,10 +832,9 @@ class FuseMvFileBetweenCollectionsTest(MountTestBase):
         self.assertRegexpMatches(collection1.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$")
         self.assertEqual(collection2.manifest_text(), "")
 
-        pool.apply(fuseMvFileBetweenCollectionsTest2, (self.mounttmp,
+        self.pool.apply(fuseMvFileBetweenCollectionsTest2, (self.mounttmp,
                                                   collection1.manifest_locator(),
                                                   collection2.manifest_locator()))
-        pool.close()
 
         collection1.update()
         collection2.update()
@@ -884,8 +877,6 @@ def fuseMvDirBetweenCollectionsTest2(mounttmp, uuid1, uuid2):
 
 class FuseMvDirBetweenCollectionsTest(MountTestBase):
     def runTest(self):
-        arvados.logger.setLevel(logging.DEBUG)
-
         collection1 = arvados.collection.Collection(api_client=self.api)
         collection1.save_new()
 
@@ -895,8 +886,7 @@ class FuseMvDirBetweenCollectionsTest(MountTestBase):
         m = self.make_mount(fuse.MagicDirectory)
 
         # See comment in FuseWriteFileTest
-        pool = multiprocessing.Pool(1)
-        pool.apply(fuseMvDirBetweenCollectionsTest1, (self.mounttmp,
+        self.pool.apply(fuseMvDirBetweenCollectionsTest1, (self.mounttmp,
                                                   collection1.manifest_locator(),
                                                   collection2.manifest_locator()))
 
@@ -906,10 +896,9 @@ class FuseMvDirBetweenCollectionsTest(MountTestBase):
         self.assertRegexpMatches(collection1.manifest_text(), r"\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$")
         self.assertEqual(collection2.manifest_text(), "")
 
-        pool.apply(fuseMvDirBetweenCollectionsTest2, (self.mounttmp,
+        self.pool.apply(fuseMvDirBetweenCollectionsTest2, (self.mounttmp,
                                                   collection1.manifest_locator(),
                                                   collection2.manifest_locator()))
-        pool.close()
 
         collection1.update()
         collection2.update()