def __init__(self, cap, min_entries=4):
self._entries = collections.OrderedDict()
self._by_uuid = {}
- self._counter = itertools.count(0)
self.cap = cap
self._total = 0
self.min_entries = min_entries
return self._total
def _remove(self, obj, clear):
- if clear and not obj.clear():
- _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
- return False
+ if clear:
+ if obj.in_use():
+ _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
+ return
+ if obj.has_ref(True):
+ obj.kernel_invalidate()
+ _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
+ return
+ obj.clear()
+
+ # The llfuse lock is released in del_entry(), which is called by
+ # Directory.clear(). While the llfuse lock is released, it can happen
+ # that a reentrant call removes this entry before this call gets to it.
+ # Ensure that the entry is still valid before trying to remove it.
+ if obj.inode not in self._entries:
+ return
+
self._total -= obj.cache_size
- del self._entries[obj.cache_priority]
+ del self._entries[obj.inode]
if obj.cache_uuid:
self._by_uuid[obj.cache_uuid].remove(obj)
if not self._by_uuid[obj.cache_uuid]:
del self._by_uuid[obj.cache_uuid]
obj.cache_uuid = None
if clear:
- _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
- return True
+ _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
def cap_cache(self):
if self._total > self.cap:
- for key in list(self._entries.keys()):
+ for ent in self._entries.values():
if self._total < self.cap or len(self._entries) < self.min_entries:
break
- self._remove(self._entries[key], True)
+ self._remove(ent, True)
def manage(self, obj):
if obj.persisted():
- obj.cache_priority = next(self._counter)
obj.cache_size = obj.objsize()
- self._entries[obj.cache_priority] = obj
+ self._entries[obj.inode] = obj
obj.cache_uuid = obj.uuid()
if obj.cache_uuid:
if obj.cache_uuid not in self._by_uuid:
if obj not in self._by_uuid[obj.cache_uuid]:
self._by_uuid[obj.cache_uuid].append(obj)
self._total += obj.objsize()
- _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+ _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
self.cap_cache()
- else:
- obj.cache_priority = None
def touch(self, obj):
if obj.persisted():
- if obj.cache_priority in self._entries:
+ if obj.inode in self._entries:
self._remove(obj, False)
self.manage(obj)
def unmanage(self, obj):
- if obj.persisted() and obj.cache_priority in self._entries:
+ if obj.persisted() and obj.inode in self._entries:
self._remove(obj, True)
def find_by_uuid(self, uuid):
fh = next(self._filehandles_counter)
self._filehandles[fh] = FileHandle(fh, p)
self.inodes.touch(p)
+ while p.parent_inode in self.inodes:
+ if p == self.inodes[p.parent_inode]:
+ break
+ p = self.inodes[p.parent_inode]
+ self.inodes.touch(p)
+ p.checkupdate()
_logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
@catch_exceptions
def create(self, inode_parent, name, mode, flags, ctx):
- _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
+ _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
p = self._check_writable(inode_parent)
p.create(name)
@catch_exceptions
def mkdir(self, inode_parent, name, mode, ctx):
- _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
+ _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
p = self._check_writable(inode_parent)
p.mkdir(name)
@catch_exceptions
def unlink(self, inode_parent, name):
- _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
+ _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.unlink(name)
@catch_exceptions
def rmdir(self, inode_parent, name):
- _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
+ _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.rmdir(name)
@catch_exceptions
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
- _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
+ _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
src = self._check_writable(inode_parent_old)
dest = self._check_writable(inode_parent_new)
dest.rename(name_old, name_new, src)
logger = logging.getLogger('arvados.arv-mount')
+ class AssertWithTimeout(object):
+ """Allow some time for an assertion to pass."""
+
+ def __init__(self, timeout=0):
+ self.timeout = timeout
+
+ def __iter__(self):
+ self.deadline = time.time() + self.timeout
+ self.done = False
+ return self
+
+ def next(self):
+ if self.done:
+ raise StopIteration
+ return self.attempt
+
+ def attempt(self, fn, *args, **kwargs):
+ try:
+ fn(*args, **kwargs)
+ except AssertionError:
+ if time.time() > self.deadline:
+ raise
+ time.sleep(0.1)
+ else:
+ self.done = True
+
+
class FuseMountTest(MountTestBase):
def setUp(self):
super(FuseMountTest, self).setUp()
bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
self.tag_collection(bar_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertIn('fuse_test_tag', llfuse.listdir(self.mounttmp))
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertIn, 'fuse_test_tag', llfuse.listdir(self.mounttmp))
self.assertDirContents('fuse_test_tag', [bar_uuid])
baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
l = self.tag_collection(baz_uuid, 'fuse_test_tag')
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid, baz_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid, baz_uuid])
self.api.links().delete(uuid=l['uuid']).execute()
- time.sleep(1)
- self.assertDirContents('fuse_test_tag', [bar_uuid])
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
class FuseSharedTest(MountTestBase):
with collection2.open("file1.txt", "w") as f:
f.write("foo")
- time.sleep(1)
-
- # should show up via event bus notify
-
- d1 = llfuse.listdir(os.path.join(self.mounttmp))
- self.assertEqual(["file1.txt"], sorted(d1))
+ for attempt in AssertWithTimeout(10):
+ attempt(self.assertEqual, ["file1.txt"], llfuse.listdir(os.path.join(self.mounttmp)))
def fuseFileConflictTestHelper(mounttmp):
def test_with_default_by_id(self):
self.verify_pdh_only(skip_pdh_only=True)
-
-def _test_refresh_old_manifest(zzz):
- fnm = 'zzzzz-8i9sb-0vsrcqi7whchuil.log.txt'
- os.listdir(os.path.join(zzz))
- time.sleep(3)
- with open(os.path.join(zzz, fnm)) as f:
- f.read()
-
-class TokenExpiryTest(MountTestBase):
- def setUp(self):
- super(TokenExpiryTest, self).setUp(local_store=False)
-
- @unittest.skip("bug #10008")
- @mock.patch('arvados.keep.KeepClient.get')
- def runTest(self, mocked_get):
- self.api._rootDesc = {"blobSignatureTtl": 2}
- mnt = self.make_mount(fuse.CollectionDirectory, collection_record='zzzzz-4zz18-op4e2lbej01tcvu')
- mocked_get.return_value = 'fake data'
-
- old_exp = int(time.time()) + 86400*14
- self.pool.apply(_test_refresh_old_manifest, (self.mounttmp,))
- want_exp = int(time.time()) + 86400*14
-
- got_loc = mocked_get.call_args[0][0]
- got_exp = int(
- re.search(r'\+A[0-9a-f]+@([0-9a-f]+)', got_loc).group(1),
- 16)
- self.assertGreaterEqual(
- got_exp, want_exp-2,
- msg='now+2w = {:x}, but fuse fetched locator {} (old_exp {:x})'.format(
- want_exp, got_loc, old_exp))
- self.assertLessEqual(
- got_exp, want_exp,
- msg='server is not using the expected 2w TTL; test is ineffective')