import logging
import multiprocessing
import run_test_server
+import mock
-logger = logging.getLogger('arvados.arv-mount')
+from mount_test_base import MountTestBase
-class MountTestBase(unittest.TestCase):
- def setUp(self):
- # The underlying C implementation of open() makes a fstat() syscall
- # with the GIL still held. When the GETATTR message comes back to
- # llfuse (which in these tests is in the same interpreter process) it
- # can't acquire the GIL, so it can't service the fstat() call, so it
- # deadlocks. The workaround is to run some of our test code in a
- # separate process. Forturnately the multiprocessing module makes this
- # relatively easy.
- self.pool = multiprocessing.Pool(1)
-
- self.keeptmp = tempfile.mkdtemp()
- os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
- self.mounttmp = tempfile.mkdtemp()
- run_test_server.run()
- run_test_server.authorize_with("admin")
- self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
-
- def make_mount(self, root_class, **root_kwargs):
- self.operations = fuse.Operations(os.getuid(), os.getgid(), enable_write=True)
- self.operations.inodes.add_entry(root_class(
- llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
- llfuse.init(self.operations, self.mounttmp, [])
- threading.Thread(None, llfuse.main).start()
- # wait until the driver is finished initializing
- self.operations.initlock.wait()
- return self.operations.inodes[llfuse.ROOT_INODE]
-
- def tearDown(self):
- self.pool.terminate()
- self.pool.join()
- del self.pool
-
- # llfuse.close is buggy, so use fusermount instead.
- #llfuse.close(unmount=True)
-
- count = 0
- success = 1
- while (count < 9 and success != 0):
- success = subprocess.call(["fusermount", "-u", self.mounttmp])
- time.sleep(0.1)
- count += 1
-
- self.operations.destroy()
-
- os.rmdir(self.mounttmp)
- shutil.rmtree(self.keeptmp)
- run_test_server.reset()
-
- def assertDirContents(self, subdir, expect_content):
- path = self.mounttmp
- if subdir:
- path = os.path.join(path, subdir)
- self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(path)))
+logger = logging.getLogger('arvados.arv-mount')
class FuseMountTest(MountTestBase):
cw.write("data 1")
cw.start_new_file('thing2.txt')
cw.write("data 2")
- cw.start_new_stream('dir1')
+ cw.start_new_stream('dir1')
cw.start_new_file('thing3.txt')
cw.write("data 3")
cw.start_new_file('thing4.txt')
cw.write("data 8")
cw.start_new_stream('edgecases')
- for f in ":/./../.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/\x01\\/ ".split("/"):
cw.start_new_file(f)
cw.write('x')
- for f in ":/../.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/\x01\\/ ".split("/"):
cw.start_new_stream('edgecases/dirs/' + f)
cw.start_new_file('x/x')
cw.write('x')
self.assertDirContents('dir2', ['thing5.txt', 'thing6.txt', 'dir3'])
self.assertDirContents('dir2/dir3', ['thing7.txt', 'thing8.txt'])
self.assertDirContents('edgecases',
- "dirs/:/_/__/.../-/*/\x01\\/ ".split("/"))
+ "dirs/:/.../-/*/\x01\\/ ".split("/"))
self.assertDirContents('edgecases/dirs',
- ":/__/.../-/*/\x01\\/ ".split("/"))
+ ":/.../-/*/\x01\\/ ".split("/"))
files = {'thing1.txt': 'data 1',
'thing2.txt': 'data 2',
class FuseMagicTest(MountTestBase):
- def setUp(self):
- super(FuseMagicTest, self).setUp()
+ def setUp(self, api=None):
+ super(FuseMagicTest, self).setUp(api=api)
cw = arvados.CollectionWriter()
cw.write("data 1")
self.testcollection = cw.finish()
- self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
+ self.test_manifest = cw.manifest_text()
+ self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
def runTest(self):
self.make_mount(fuse.MagicDirectory)
self.pool.apply(fuseProjectMvTestHelper1, (self.mounttmp,))
+def fuseFsyncTestHelper(mounttmp, k):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ fd = os.open(os.path.join(mounttmp, k), os.O_RDONLY)
+ os.fsync(fd)
+ os.close(fd)
+
+ Test().runTest()
+
+class FuseFsyncTest(FuseMagicTest):
+ def runTest(self):
+ self.make_mount(fuse.MagicDirectory)
+ self.pool.apply(fuseFsyncTestHelper, (self.mounttmp, self.testcollection))
+
+
+class MagicDirApiError(FuseMagicTest):
+ def setUp(self):
+ api = mock.MagicMock()
+ super(MagicDirApiError, self).setUp(api=api)
+ api.collections().get().execute.side_effect = iter([Exception('API fail'), {"manifest_text": self.test_manifest}])
+ api.keep.get.side_effect = Exception('Keep fail')
+
+ def runTest(self):
+ self.make_mount(fuse.MagicDirectory)
+
+ self.operations.inodes.inode_cache.cap = 1
+ self.operations.inodes.inode_cache.min_entries = 2
+
+ with self.assertRaises(OSError):
+ llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
+
+ llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
+
+
class FuseUnitTest(unittest.TestCase):
def test_sanitize_filename(self):
acceptable = [