self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
def make_mount(self, root_class, **root_kwargs):
- self.operations = fuse.Operations(os.getuid(), os.getgid())
+ self.operations = fuse.Operations(os.getuid(), os.getgid(), enable_write=True)
self.operations.inodes.add_entry(root_class(
llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
llfuse.init(self.operations, self.mounttmp, [])
return self.operations.inodes[llfuse.ROOT_INODE]
def tearDown(self):
- self.pool.close()
+ self.pool.terminate()
+ self.pool.join()
del self.pool
# llfuse.close is buggy, so use fusermount instead.
m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
- # See note in FuseWriteFileTest
+ # See note in MountTestBase.setUp
self.pool.apply(fuseUpdateFileTestHelper, (self.mounttmp,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
with collection2.open("file1.txt", "w") as f:
f.write("foo")
- # See comment in FuseWriteFileTest
+ # See note in MountTestBase.setUp
self.pool.apply(fuseFileConflictTestHelper, (self.mounttmp,))
with llfuse.lock:
m.new_collection(collection.api_response(), collection)
- # See comment in FuseWriteFileTest
+ # See note in MountTestBase.setUp
self.pool.apply(fuseUnlinkOpenFileTest, (self.mounttmp,))
self.assertEqual(collection.manifest_text(), "")
m = self.make_mount(fuse.MagicDirectory)
- # See comment in FuseWriteFileTest
+ # See note in MountTestBase.setUp
self.pool.apply(fuseMvFileBetweenCollectionsTest1, (self.mounttmp,
collection1.manifest_locator(),
collection2.manifest_locator()))
m = self.make_mount(fuse.MagicDirectory)
- # See comment in FuseWriteFileTest
+ # See note in MountTestBase.setUp
self.pool.apply(fuseMvDirBetweenCollectionsTest1, (self.mounttmp,
collection1.manifest_locator(),
collection2.manifest_locator()))