import time
import unittest
import tempfile
+import parameterized
import arvados
import arvados_fuse as fuse
else:
self.done = True
-
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseMountTest(MountTestBase):
def setUp(self):
super(FuseMountTest, self).setUp()
self.assertEqual(v, f.read().decode())
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseMagicTest(MountTestBase):
def setUp(self, api=None):
super(FuseMagicTest, self).setUp(api=api)
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
self.assertEqual("plnp", f.read())
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseModifyFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
self.pool.apply(fuseModifyFileTestHelperReadEndContents, (self.mounttmp,))
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseAddFileToCollectionTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseRemoveFileFromCollectionTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
pass
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseCreateFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
self.assertEqual(f.read(), "Hello world!")
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseWriteFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
Test().runTest()
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseUpdateFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
class MagicDirApiError(FuseMagicTest):
def setUp(self):
api = mock.MagicMock()
+ api.keep.block_cache = mock.MagicMock(cache_max=1)
super(MagicDirApiError, self).setUp(api=api)
api.collections().get().execute.side_effect = iter([
Exception('API fail'),