import multiprocessing
import run_test_server
import mock
+import re
from mount_test_base import MountTestBase
logger = logging.getLogger('arvados.arv-mount')
+class AssertWithTimeout(object):
+ """Allow some time for an assertion to pass."""
+
+ def __init__(self, timeout=0):
+ self.timeout = timeout
+
+ def __iter__(self):
+ self.deadline = time.time() + self.timeout
+ self.done = False
+ return self
+
+ def next(self):
+ if self.done:
+ raise StopIteration
+ return self.attempt
+
+ def attempt(self, fn, *args, **kwargs):
+ try:
+ fn(*args, **kwargs)
+ except AssertionError:
+ if time.time() > self.deadline:
+ raise
+ time.sleep(0.1)
+ else:
+ self.done = True
+
+
class FuseMountTest(MountTestBase):
def setUp(self):
super(FuseMountTest, self).setUp()
bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
self.tag_collection(bar_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertIn('fuse_test_tag', llfuse.listdir(self.mounttmp))
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertIn, 'fuse_test_tag', llfuse.listdir(self.mounttmp))
self.assertDirContents('fuse_test_tag', [bar_uuid])
baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
l = self.tag_collection(baz_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid, baz_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid, baz_uuid])
self.api.links().delete(uuid=l['uuid']).execute()
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
class FuseSharedTest(MountTestBase):
# check mtime on template
st = os.stat(pipeline_template_path)
- self.assertEqual(st.st_mtime, 1397493304)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1397493304)
# check mtime on collection
st = os.stat(os.path.join(
self.mounttmp,
'FUSE User',
'collection #1 owned by FUSE'))
- self.assertEqual(st.st_mtime, 1391448174)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1391448174)
class FuseHomeTest(MountTestBase):
self.assertNotIn("file1.txt", collection)
+ self.assertEqual(0, self.operations.write_counter.get())
self.pool.apply(fuseWriteFileTestHelperWriteFile, (self.mounttmp,))
+ self.assertEqual(12, self.operations.write_counter.get())
with collection.open("file1.txt") as f:
self.assertEqual(f.read(), "Hello world!")
+ self.assertEqual(0, self.operations.read_counter.get())
self.pool.apply(fuseWriteFileTestHelperReadFile, (self.mounttmp,))
+ self.assertEqual(12, self.operations.read_counter.get())
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertRegexpMatches(collection2["manifest_text"],
with llfuse.lock:
m.new_collection(collection.api_response(), collection)
- self.operations.listen_for_events(self.api)
+ self.operations.listen_for_events()
d1 = llfuse.listdir(os.path.join(self.mounttmp))
self.assertEqual([], sorted(d1))
with collection2.open("file1.txt", "w") as f:
f.write("foo")
- time.sleep(1)
-
- # should show up via event bus notify
-
- d1 = llfuse.listdir(os.path.join(self.mounttmp))
- self.assertEqual(["file1.txt"], sorted(d1))
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertEqual, ["file1.txt"], llfuse.listdir(os.path.join(self.mounttmp)))
def fuseFileConflictTestHelper(mounttmp):
created = self.api.collections().create(body={"manifest_text":self.test_manifest}).execute()
self.testcollectionuuid = str(created['uuid'])
- def runTest(self):
- self.make_mount(fuse.MagicDirectory, pdh_only=True)
+ def verify_pdh_only(self, pdh_only=False, skip_pdh_only=False):
+ if skip_pdh_only is True:
+ self.make_mount(fuse.MagicDirectory) # in this case, the default by_id applies
+ else:
+ self.make_mount(fuse.MagicDirectory, pdh_only=pdh_only)
mount_ls = llfuse.listdir(self.mounttmp)
self.assertIn('README', mount_ls)
for fn in mount_ls),
"new FUSE MagicDirectory lists Collection")
- # look up using pdh should succeed
+ # look up using pdh should succeed in all cases
self.assertDirContents(self.testcollection, ['thing1.txt'])
self.assertDirContents(os.path.join('by_id', self.testcollection),
['thing1.txt'])
with open(os.path.join(self.mounttmp, k)) as f:
self.assertEqual(v, f.read())
- # look up using uuid should fail
- with self.assertRaises(OSError):
+ # look up using uuid should fail when pdh_only is set
+ if pdh_only is True:
+ with self.assertRaises(OSError):
+ self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
+ ['thing1.txt'])
+ else:
self.assertDirContents(os.path.join('by_id', self.testcollectionuuid),
['thing1.txt'])
+
+ def test_with_pdh_only_true(self):
+ self.verify_pdh_only(pdh_only=True)
+
+ def test_with_pdh_only_false(self):
+ self.verify_pdh_only(pdh_only=False)
+
+ def test_with_default_by_id(self):
+ self.verify_pdh_only(skip_pdh_only=True)