def _remove(self, obj, clear):
if clear and not obj.clear():
- _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
+ _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
return False
self._total -= obj.cache_size
del self._entries[obj.cache_priority]
if obj.cache_uuid:
del self._by_uuid[obj.cache_uuid]
obj.cache_uuid = None
- _logger.debug("Cleared %s total now %i", obj, self._total)
+ if clear:
+ _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
return True
def cap_cache(self):
- _logger.debug("total is %i cap is %i", self._total, self.cap)
+ #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
if self._total > self.cap:
for key in list(self._entries.keys()):
if self._total < self.cap or len(self._entries) < self.min_entries:
if obj.cache_uuid:
self._by_uuid[obj.cache_uuid] = obj
self._total += obj.objsize()
- _logger.debug("Managing %s total now %i", obj, self._total)
+ _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
self.cap_cache()
else:
obj.cache_priority = None
if obj.cache_priority in self._entries:
self._remove(obj, False)
self.manage(obj)
- _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
def unmanage(self, obj):
if obj.persisted() and obj.cache_priority in self._entries:
if not p.writable():
raise llfuse.FUSEError(errno.EPERM)
- if not isinstance(p, CollectionDirectoryBase):
- raise llfuse.FUSEError(errno.EPERM)
-
return p
@catch_exceptions
p = self._check_writable(inode_parent)
with llfuse.lock_released:
- p.collection.open(name, "w")
+ p.create(name)
# The file entry should have been implicitly created by callback.
f = p[name]
p = self._check_writable(inode_parent)
with llfuse.lock_released:
- p.collection.mkdirs(name)
+ p.mkdir(name)
# The dir entry should have been implicitly created by callback.
d = p[name]
p = self._check_writable(inode_parent)
with llfuse.lock_released:
- p.collection.remove(name)
+ p.unlink(name)
+ @catch_exceptions
def rmdir(self, inode_parent, name):
- self.unlink(inode_parent, name)
+ _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
+ p = self._check_writable(inode_parent)
+
+ with llfuse.lock_released:
+ p.rmdir(name)
@catch_exceptions
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
dest = self._check_writable(inode_parent_new)
with llfuse.lock_released:
- dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
- dest.flush()
- src.flush()
+ dest.rename(name_old, name_new, src)
@catch_exceptions
def flush(self, fh):
import arvados
import apiclient
import functools
+import threading
from fusefile import StringFile, ObjectFile, FuseArvadosFile
from fresh import FreshBase, convertTime, use_counter
try:
self.update()
except apiclient.errors.HttpError as e:
- _logger.debug(e)
+ _logger.warn(e)
@use_counter
def __getitem__(self, item):
self._entries[name] = oldentries[name]
del oldentries[name]
else:
+ _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
# create new directory entry
ent = new_entry(i)
if ent is not None:
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
+ _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
llfuse.invalidate_entry(self.inode, str(i))
self.inodes.del_entry(oldentries[i])
changed = True
def flush(self):
pass
+ def create(self):
+ raise NotImplementedError()
+
+ def mkdir(self):
+ raise NotImplementedError()
+
+ def unlink(self):
+ raise NotImplementedError()
+
+ def rmdir(self):
+ raise NotImplementedError()
+
+ def rename(self):
+ raise NotImplementedError()
+
class CollectionDirectoryBase(Directory):
def __init__(self, parent_inode, inodes, collection):
super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
def flush(self):
self.collection.root_collection().save()
+ def create(self, name):
+ self.collection.open(name, "w").close()
+
+ def mkdir(self, name):
+ self.collection.mkdirs(name)
+
+ def unlink(self, name):
+ self.collection.remove(name)
+
+ def rmdir(self, name):
+ self.collection.remove(name)
+
+ def rename(self, name_old, name_new, src):
+ if not isinstance(src, CollectionDirectoryBase):
+ raise llfuse.FUSEError(errno.EPERM)
+ self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+ self.flush()
+ src.flush()
+
+
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree holding a collection."""
self._manifest_size = 0
if self.collection_locator:
self._writable = (uuid_pattern.match(self.collection_locator) is not None)
+ self._updating_lock = threading.Lock()
def same(self, i):
return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
self.fresh()
return True
- with llfuse.lock_released:
- _logger.debug("Updating %s", self.collection_locator)
- if self.collection:
- self.collection.update()
- else:
- if uuid_pattern.match(self.collection_locator):
- coll_reader = arvados.collection.Collection(
- self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ try:
+ with llfuse.lock_released:
+ self._updating_lock.acquire()
+ if not self.stale():
+ return
+
+ _logger.debug("Updating %s", self.collection_locator)
+ if self.collection:
+ self.collection.update()
else:
- coll_reader = arvados.collection.CollectionReader(
- self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
- new_collection_record = coll_reader.api_response() or {}
- # If the Collection only exists in Keep, there will be no API
- # response. Fill in the fields we need.
- if 'uuid' not in new_collection_record:
- new_collection_record['uuid'] = self.collection_locator
- if "portable_data_hash" not in new_collection_record:
- new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
- if 'manifest_text' not in new_collection_record:
- new_collection_record['manifest_text'] = coll_reader.manifest_text()
-
- if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
- self.new_collection(new_collection_record, coll_reader)
-
- self._manifest_size = len(coll_reader.manifest_text())
- _logger.debug("%s manifest_size %i", self, self._manifest_size)
- # end with llfuse.lock_released, re-acquire lock
+ if uuid_pattern.match(self.collection_locator):
+ coll_reader = arvados.collection.Collection(
+ self.collection_locator, self.api, self.api.keep,
+ num_retries=self.num_retries)
+ else:
+ coll_reader = arvados.collection.CollectionReader(
+ self.collection_locator, self.api, self.api.keep,
+ num_retries=self.num_retries)
+ new_collection_record = coll_reader.api_response() or {}
+ # If the Collection only exists in Keep, there will be no API
+ # response. Fill in the fields we need.
+ if 'uuid' not in new_collection_record:
+ new_collection_record['uuid'] = self.collection_locator
+ if "portable_data_hash" not in new_collection_record:
+ new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+ if 'manifest_text' not in new_collection_record:
+ new_collection_record['manifest_text'] = coll_reader.manifest_text()
+
+ if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
+ self.new_collection(new_collection_record, coll_reader)
+
+ self._manifest_size = len(coll_reader.manifest_text())
+ _logger.debug("%s manifest_size %i", self, self._manifest_size)
+ # end with llfuse.lock_released, re-acquire lock
- self.fresh()
- return True
+ self.fresh()
+ return True
+ finally:
+ self._updating_lock.release()
except arvados.errors.NotFoundError:
_logger.exception("arv-mount %s: error", self.collection_locator)
except arvados.errors.ArgumentError as detail:
self.num_retries = num_retries
self.project_object = project_object
self.project_object_file = None
- self.uuid = project_object['uuid']
+ self.project_uuid = project_object['uuid']
self._poll = poll
self._poll_time = poll_time
+ self._updating_lock = threading.Lock()
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
return None
def uuid(self):
- return self.uuid
+ return self.project_uuid
def update(self):
if self.project_object_file == None:
return None
def samefn(a, i):
- if isinstance(a, CollectionDirectory):
- return a.collection_locator == i['uuid']
- elif isinstance(a, ProjectDirectory):
- return a.uuid == i['uuid']
+ if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
+ return a.uuid() == i['uuid']
elif isinstance(a, ObjectFile):
- return a.uuid == i['uuid'] and not a.stale()
+ return a.uuid() == i['uuid'] and not a.stale()
return False
- with llfuse.lock_released:
- if group_uuid_pattern.match(self.uuid):
- self.project_object = self.api.groups().get(
- uuid=self.uuid).execute(num_retries=self.num_retries)
- elif user_uuid_pattern.match(self.uuid):
- self.project_object = self.api.users().get(
- uuid=self.uuid).execute(num_retries=self.num_retries)
+ try:
+ with llfuse.lock_released:
+ self._updating_lock.acquire()
+ if not self.stale():
+ return
- contents = arvados.util.list_all(self.api.groups().contents,
- self.num_retries, uuid=self.uuid)
+ if group_uuid_pattern.match(self.project_uuid):
+ self.project_object = self.api.groups().get(
+ uuid=self.project_uuid).execute(num_retries=self.num_retries)
+ elif user_uuid_pattern.match(self.project_uuid):
+ self.project_object = self.api.users().get(
+ uuid=self.project_uuid).execute(num_retries=self.num_retries)
- # end with llfuse.lock_released, re-acquire lock
+ contents = arvados.util.list_all(self.api.groups().contents,
+ self.num_retries, uuid=self.project_uuid)
+
+ # end with llfuse.lock_released, re-acquire lock
- self.merge(contents,
- namefn,
- samefn,
- self.createDirectory)
+ self.merge(contents,
+ namefn,
+ samefn,
+ self.createDirectory)
+ finally:
+ self._updating_lock.release()
def __getitem__(self, item):
self.checkupdate()
return super(ProjectDirectory, self).__contains__(k)
def persisted(self):
- return False
+ return True
class SharedDirectory(Directory):
for r in root_owners:
if r in objects:
obr = objects[r]
- if "name" in obr:
+ if obr.get("name"):
contents[obr["name"]] = obr
- if "first_name" in obr:
+ #elif obr.get("username"):
+ # contents[obr["username"]] = obr
+ elif "first_name" in obr:
contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
+
for r in roots:
if r['owner_uuid'] not in objects:
contents[r['name']] = r
try:
self.merge(contents.items(),
lambda i: i[0],
- lambda a, i: a.uuid == i[1]['uuid'],
+ lambda a, i: a.uuid() == i[1]['uuid'],
lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception:
_logger.exception()
run_test_server.run()
run_test_server.authorize_with("admin")
self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+ self.pool = multiprocessing.Pool(1)
def make_mount(self, root_class, **root_kwargs):
self.operations = fuse.Operations(os.getuid(), os.getgid())
os.rmdir(self.mounttmp)
shutil.rmtree(self.keeptmp)
run_test_server.reset()
+ self.pool.close()
def assertDirContents(self, subdir, expect_content):
path = self.mounttmp
# to the current user)
shared_dirs = llfuse.listdir(self.mounttmp)
shared_dirs.sort()
- self.assertIn('FUSE User', shared_dirs)
+ self.assertIn('FUSE', shared_dirs)
# fuse_user_objs is a list of the objects owned by the FUSE
# test user (which present as files in the 'FUSE User'
# directory)
- fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
+ fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE'))
fuse_user_objs.sort()
self.assertEqual(['FUSE Test Project', # project owned by user
'collection #1 owned by FUSE', # collection owned by user
], fuse_user_objs)
# test_proj_files is a list of the files in the FUSE Test Project.
- test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
+ test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE', 'FUSE Test Project'))
test_proj_files.sort()
self.assertEqual(['collection in FUSE project',
'pipeline instance in FUSE project.pipelineInstance',
# and that its contents are what we expect.
pipeline_template_path = os.path.join(
self.mounttmp,
- 'FUSE User',
+ 'FUSE',
'FUSE Test Project',
'pipeline template in FUSE project.pipelineTemplate')
with open(pipeline_template_path) as f:
# check mtime on collection
st = os.stat(os.path.join(
self.mounttmp,
- 'FUSE User',
+ 'FUSE',
'collection #1 owned by FUSE'))
self.assertEqual(st.st_mtime, 1391448174)
d3 = llfuse.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)
-class FuseUpdateFileTest(MountTestBase):
+
+def fuseModifyFileTestHelper1(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ d1 = llfuse.listdir(mounttmp)
+ self.assertEqual(["file1.txt"], d1)
+ with open(os.path.join(mounttmp, "file1.txt")) as f:
+ self.assertEqual("blub", f.read())
+ Test().runTest()
+
+def fuseModifyFileTestHelper2(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ d1 = llfuse.listdir(mounttmp)
+ self.assertEqual(["file1.txt"], d1)
+ with open(os.path.join(mounttmp, "file1.txt")) as f:
+ self.assertEqual("plnp", f.read())
+ Test().runTest()
+
+class FuseModifyFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
with collection.open("file1.txt", "w") as f:
with llfuse.lock:
m.new_collection(collection.api_response(), collection)
- d1 = llfuse.listdir(self.mounttmp)
- self.assertEqual(["file1.txt"], d1)
- with open(os.path.join(self.mounttmp, "file1.txt")) as f:
- self.assertEqual("blub", f.read())
+ self.pool.apply(fuseModifyFileTestHelper1, (self.mounttmp,))
with collection.open("file1.txt", "w") as f:
f.write("plnp")
- d1 = llfuse.listdir(self.mounttmp)
- self.assertEqual(["file1.txt"], d1)
- with open(os.path.join(self.mounttmp, "file1.txt")) as f:
- self.assertEqual("plnp", f.read())
+ self.pool.apply(fuseModifyFileTestHelper2, (self.mounttmp,))
class FuseAddFileToCollectionTest(MountTestBase):
def runTest(self):
self.assertRegexpMatches(collection2["manifest_text"],
r'\. d41d8cd98f00b204e9800998ecf8427e\+0\+A[a-f0-9]{40}@[a-f0-9]{8} 0:0:file1\.txt$')
-def fuseWriteFileTestHelper(mounttmp):
+def fuseWriteFileTestHelper1(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+ f.write("Hello world!")
+ Test().runTest()
+
+def fuseWriteFileTestHelper2(mounttmp):
class Test(unittest.TestCase):
def runTest(self):
with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
class FuseWriteFileTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
self.assertNotIn("file1.txt", collection)
- with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
- f.write("Hello world!")
-
- with collection.open("file1.txt") as f:
- self.assertEqual(f.read(), "Hello world!")
-
# We can't just open the collection for reading because the underlying
# C implementation of open() makes a fstat() syscall with the GIL still
# held. When the GETATTR message comes back to llfuse (which in these
# so it can't service the fstat() call, so it deadlocks. The
# workaround is to run some of our test code in a separate process.
# Forturnately the multiprocessing module makes this relatively easy.
- pool = multiprocessing.Pool(1)
- pool.apply(fuseWriteFileTestHelper, (self.mounttmp,))
- pool.close()
+ self.pool.apply(fuseWriteFileTestHelper1, (self.mounttmp,))
+
+ with collection.open("file1.txt") as f:
+ self.assertEqual(f.read(), "Hello world!")
+
+ self.pool.apply(fuseWriteFileTestHelper2, (self.mounttmp,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertRegexpMatches(collection2["manifest_text"],
r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
-def fuseUpdateFileTestHelper1(mounttmp):
+def fuseUpdateFileTestHelper(mounttmp):
class Test(unittest.TestCase):
def runTest(self):
+ with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+ f.write("Hello world!")
+
with open(os.path.join(mounttmp, "file1.txt"), "r+") as f:
fr = f.read()
self.assertEqual(fr, "Hello world!")
f.seek(0)
fr = f.read()
self.assertEqual(fr, "Hola mundo!!")
- return True
- Test().runTest()
-def fuseUpdateFileTestHelper2(mounttmp):
- class Test(unittest.TestCase):
- def runTest(self):
with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
- return f.read() == "Hola mundo!!"
+ self.assertEqual(f.read(), "Hola mundo!!")
+
Test().runTest()
class FuseUpdateFileTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
- with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
- f.write("Hello world!")
-
# See note in FuseWriteFileTest
- pool = multiprocessing.Pool(1)
- pool.apply(fuseUpdateFileTestHelper1, (self.mounttmp,))
- pool.apply(fuseUpdateFileTestHelper2, (self.mounttmp,))
- pool.close()
+ self.pool.apply(fuseUpdateFileTestHelper, (self.mounttmp,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertRegexpMatches(collection2["manifest_text"],
class FuseMkdirTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
class FuseRmTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
class FuseMvFileTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+def fuseRenameTestHelper(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ os.mkdir(os.path.join(mounttmp, "testdir"))
+
+ with open(os.path.join(mounttmp, "testdir", "file1.txt"), "w") as f:
+ f.write("Hello world!")
+
+ Test().runTest()
+
class FuseRenameTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
- os.mkdir(os.path.join(self.mounttmp, "testdir"))
-
- with open(os.path.join(self.mounttmp, "testdir", "file1.txt"), "w") as f:
- f.write("Hello world!")
+ self.pool.apply(fuseRenameTestHelper, (self.mounttmp,))
# Starting manifest
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
class FuseUpdateFromEventTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
class FuseFileConflictTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
f.write("bar")
# See comment in FuseWriteFileTest
- pool = multiprocessing.Pool(1)
- pool.apply(fuseFileConflictTestHelper, (self.mounttmp,))
- pool.close()
+ self.pool.apply(fuseFileConflictTestHelper, (self.mounttmp,))
def fuseUnlinkOpenFileTest(mounttmp):
class FuseUnlinkOpenFileTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
m.new_collection(collection.api_response(), collection)
# See comment in FuseWriteFileTest
- pool = multiprocessing.Pool(1)
- pool.apply(fuseUnlinkOpenFileTest, (self.mounttmp,))
- pool.close()
+ self.pool.apply(fuseUnlinkOpenFileTest, (self.mounttmp,))
self.assertEqual(collection.manifest_text(), "")
class FuseMvFileBetweenCollectionsTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection1 = arvados.collection.Collection(api_client=self.api)
collection1.save_new()
m = self.make_mount(fuse.MagicDirectory)
# See comment in FuseWriteFileTest
- pool = multiprocessing.Pool(1)
- pool.apply(fuseMvFileBetweenCollectionsTest1, (self.mounttmp,
+ self.pool.apply(fuseMvFileBetweenCollectionsTest1, (self.mounttmp,
collection1.manifest_locator(),
collection2.manifest_locator()))
self.assertRegexpMatches(collection1.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$")
self.assertEqual(collection2.manifest_text(), "")
- pool.apply(fuseMvFileBetweenCollectionsTest2, (self.mounttmp,
+ self.pool.apply(fuseMvFileBetweenCollectionsTest2, (self.mounttmp,
collection1.manifest_locator(),
collection2.manifest_locator()))
- pool.close()
collection1.update()
collection2.update()
class FuseMvDirBetweenCollectionsTest(MountTestBase):
def runTest(self):
- arvados.logger.setLevel(logging.DEBUG)
-
collection1 = arvados.collection.Collection(api_client=self.api)
collection1.save_new()
m = self.make_mount(fuse.MagicDirectory)
# See comment in FuseWriteFileTest
- pool = multiprocessing.Pool(1)
- pool.apply(fuseMvDirBetweenCollectionsTest1, (self.mounttmp,
+ self.pool.apply(fuseMvDirBetweenCollectionsTest1, (self.mounttmp,
collection1.manifest_locator(),
collection2.manifest_locator()))
self.assertRegexpMatches(collection1.manifest_text(), r"\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$")
self.assertEqual(collection2.manifest_text(), "")
- pool.apply(fuseMvDirBetweenCollectionsTest2, (self.mounttmp,
+ self.pool.apply(fuseMvDirBetweenCollectionsTest2, (self.mounttmp,
collection1.manifest_locator(),
collection2.manifest_locator()))
- pool.close()
collection1.update()
collection2.update()