import logging
import multiprocessing
import run_test_server
+import mock
+import re
from mount_test_base import MountTestBase
logger = logging.getLogger('arvados.arv-mount')
+class AssertWithTimeout(object):
+ """Allow some time for an assertion to pass."""
+
+ def __init__(self, timeout=0):
+ self.timeout = timeout
+
+ def __iter__(self):
+ self.deadline = time.time() + self.timeout
+ self.done = False
+ return self
+
+ def next(self):
+ if self.done:
+ raise StopIteration
+ return self.attempt
+
+ def attempt(self, fn, *args, **kwargs):
+ try:
+ fn(*args, **kwargs)
+ except AssertionError:
+ if time.time() > self.deadline:
+ raise
+ time.sleep(0.1)
+ else:
+ self.done = True
+
+
class FuseMountTest(MountTestBase):
def setUp(self):
super(FuseMountTest, self).setUp()
cw.write("data 1")
cw.start_new_file('thing2.txt')
cw.write("data 2")
- cw.start_new_stream('dir1')
+ cw.start_new_stream('dir1')
cw.start_new_file('thing3.txt')
cw.write("data 3")
cw.start_new_file('thing4.txt')
cw.write("data 8")
cw.start_new_stream('edgecases')
- for f in ":/./../.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/\x01\\/ ".split("/"):
cw.start_new_file(f)
cw.write('x')
- for f in ":/../.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/\x01\\/ ".split("/"):
cw.start_new_stream('edgecases/dirs/' + f)
cw.start_new_file('x/x')
cw.write('x')
self.assertDirContents('dir2', ['thing5.txt', 'thing6.txt', 'dir3'])
self.assertDirContents('dir2/dir3', ['thing7.txt', 'thing8.txt'])
self.assertDirContents('edgecases',
- "dirs/:/_/__/.../-/*/\x01\\/ ".split("/"))
+ "dirs/:/.../-/*/\x01\\/ ".split("/"))
self.assertDirContents('edgecases/dirs',
- ":/__/.../-/*/\x01\\/ ".split("/"))
+ ":/.../-/*/\x01\\/ ".split("/"))
files = {'thing1.txt': 'data 1',
'thing2.txt': 'data 2',
class FuseMagicTest(MountTestBase):
- def setUp(self):
- super(FuseMagicTest, self).setUp()
+ def setUp(self, api=None):
+ super(FuseMagicTest, self).setUp(api=api)
cw = arvados.CollectionWriter()
cw.write("data 1")
self.testcollection = cw.finish()
- self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
+ self.test_manifest = cw.manifest_text()
+ self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
def runTest(self):
self.make_mount(fuse.MagicDirectory)
bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
self.tag_collection(bar_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertIn('fuse_test_tag', llfuse.listdir(self.mounttmp))
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertIn, 'fuse_test_tag', llfuse.listdir(self.mounttmp))
self.assertDirContents('fuse_test_tag', [bar_uuid])
baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
l = self.tag_collection(baz_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid, baz_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid, baz_uuid])
self.api.links().delete(uuid=l['uuid']).execute()
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
class FuseSharedTest(MountTestBase):
# check mtime on template
st = os.stat(pipeline_template_path)
- self.assertEqual(st.st_mtime, 1397493304)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1397493304)
# check mtime on collection
st = os.stat(os.path.join(
self.mounttmp,
'FUSE User',
'collection #1 owned by FUSE'))
- self.assertEqual(st.st_mtime, 1391448174)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1391448174)
class FuseHomeTest(MountTestBase):
self.assertNotIn("file1.txt", collection)
+ self.assertEqual(0, self.operations.write_counter.get())
self.pool.apply(fuseWriteFileTestHelperWriteFile, (self.mounttmp,))
+ self.assertEqual(12, self.operations.write_counter.get())
with collection.open("file1.txt") as f:
self.assertEqual(f.read(), "Hello world!")
+ self.assertEqual(0, self.operations.read_counter.get())
self.pool.apply(fuseWriteFileTestHelperReadFile, (self.mounttmp,))
+ self.assertEqual(12, self.operations.read_counter.get())
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertRegexpMatches(collection2["manifest_text"],
with llfuse.lock:
m.new_collection(collection.api_response(), collection)
- self.operations.listen_for_events(self.api)
+ self.operations.listen_for_events()
d1 = llfuse.listdir(os.path.join(self.mounttmp))
self.assertEqual([], sorted(d1))
with collection2.open("file1.txt", "w") as f:
f.write("foo")
- time.sleep(1)
-
- # should show up via event bus notify
-
- d1 = llfuse.listdir(os.path.join(self.mounttmp))
- self.assertEqual(["file1.txt"], sorted(d1))
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertEqual, ["file1.txt"], llfuse.listdir(os.path.join(self.mounttmp)))
def fuseFileConflictTestHelper(mounttmp):
self.pool.apply(fuseProjectMvTestHelper1, (self.mounttmp,))
+def fuseFsyncTestHelper(mounttmp, k):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ fd = os.open(os.path.join(mounttmp, k), os.O_RDONLY)
+ os.fsync(fd)
+ os.close(fd)
+
+ Test().runTest()
+
+class FuseFsyncTest(FuseMagicTest):
+ def runTest(self):
+ self.make_mount(fuse.MagicDirectory)
+ self.pool.apply(fuseFsyncTestHelper, (self.mounttmp, self.testcollection))
+
+
+class MagicDirApiError(FuseMagicTest):
+ def setUp(self):
+ api = mock.MagicMock()
+ super(MagicDirApiError, self).setUp(api=api)
+ api.collections().get().execute.side_effect = iter([Exception('API fail'), {"manifest_text": self.test_manifest}])
+ api.keep.get.side_effect = Exception('Keep fail')
+
+ def runTest(self):
+ self.make_mount(fuse.MagicDirectory)
+
+ self.operations.inodes.inode_cache.cap = 1
+ self.operations.inodes.inode_cache.min_entries = 2
+
+ with self.assertRaises(OSError):
+ llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
+
+ llfuse.listdir(os.path.join(self.mounttmp, self.testcollection))
+
+
class FuseUnitTest(unittest.TestCase):
def test_sanitize_filename(self):
acceptable = [
self.assertEqual("_", fuse.sanitize_filename(""))
self.assertEqual("_", fuse.sanitize_filename("."))
self.assertEqual("__", fuse.sanitize_filename(".."))
+
+
+class FuseMagicTestPDHOnly(MountTestBase):
+ def setUp(self, api=None):
+ super(FuseMagicTestPDHOnly, self).setUp(api=api)
+
+ cw = arvados.CollectionWriter()
+
+ cw.start_new_file('thing1.txt')
+ cw.write("data 1")
+
+ self.testcollection = cw.finish()
+ self.test_manifest = cw.manifest_text()
+ created = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
+ self.testcollectionuuid = str(created['uuid'])
+
+ def verify_pdh_only(self, pdh_only=False, skip_pdh_only=False):
+ if skip_pdh_only is True:
+ self.make_mount(fuse.MagicDirectory) # in this case, the default by_id applies
+ else:
+ self.make_mount(fuse.MagicDirectory, pdh_only=pdh_only)
+
+ mount_ls = llfuse.listdir(self.mounttmp)
+ self.assertIn('README', mount_ls)
+ self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
+ arvados.util.uuid_pattern.match(fn)
+ for fn in mount_ls),
+ "new FUSE MagicDirectory lists Collection")
+
+ # look up using pdh should succeed in all cases
+ self.assertDirContents(self.testcollection, ['thing1.txt'])
+ self.assertDirContents(os.path.join('by_id', self.testcollection),
+ ['thing1.txt'])
+ mount_ls = llfuse.listdir(self.mounttmp)
+ self.assertIn('README', mount_ls)
+ self.assertIn(self.testcollection, mount_ls)
+ self.assertIn(self.testcollection,
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+
+ files = {}
+ files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
+
+ for k, v in files.items():
+ with open(os.path.join(self.mounttmp, k)) as f:
+ self.assertEqual(v, f.read())
+
+ # look up using uuid should fail when pdh_only is set
+ if pdh_only is True:
+ with self.assertRaises(OSError):
+ self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+ ['thing1.txt'])
+ else:
+ self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+ ['thing1.txt'])
+
+ def test_with_pdh_only_true(self):
+ self.verify_pdh_only(pdh_only=True)
+
+ def test_with_pdh_only_false(self):
+ self.verify_pdh_only(pdh_only=False)
+
+ def test_with_default_by_id(self):
+ self.verify_pdh_only(skip_pdh_only=True)