From d8d82841f43394b3781804844d4860bc8205d5fc Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 13 Apr 2015 16:42:46 -0400 Subject: [PATCH] 3198: Implement cache management for directory objects. --- services/fuse/arvados_fuse/__init__.py | 100 +++++++++++++++++---- services/fuse/arvados_fuse/fusedir.py | 116 ++++++++++++++++--------- services/fuse/arvados_fuse/fusefile.py | 8 ++ services/fuse/tests/test_mount.py | 5 +- 4 files changed, 169 insertions(+), 60 deletions(-) diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index a886e37f62..deb0cd35a3 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -21,6 +21,7 @@ import calendar import threading import itertools import ciso8601 +import collections from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory from fusefile import StreamReaderFile @@ -29,21 +30,71 @@ _logger = logging.getLogger('arvados.arvados_fuse') class FileHandle(object): - """Connects a numeric file handle to a File or Directory object that has + """Connects a numeric file handle to a File object that has been opened by the client.""" - def __init__(self, fh, entry): + def __init__(self, fh, fileobj): self.fh = fh - self.entry = entry + self.fileobj = fileobj + self.fileobj.inc_use() + + def release(self): + self.fileobj.dec_use() + + +class DirectoryHandle(object): + """Connects a numeric file handle to a Directory object that has + been opened by the client.""" + + def __init__(self, fh, dirobj, entries): + self.fh = fh + self.entries = entries + self.dirobj = dirobj + self.dirobj.inc_use() + + def release(self): + self.dirobj.dec_use() + + +class ObjectCache(object): + def __init__(self, cap): + self._entries = collections.OrderedDict() + self._counter = itertools.count(1) + self.cap = cap + + def cap_cache(self): + if len(self._entries) > self.cap: + ent = iter(self._entries) + ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)] + for key in ents: + capobj = self._entries[key] + if capobj.clear(): + del self._entries[key] + + def manage(self, obj): + obj._cache_priority = next(self._counter) + self._entries[obj._cache_priority] = obj + self.cap_cache() + + def touch(self, obj): + if obj._cache_priority in self._entries: + del self._entries[obj._cache_priority] + self.manage(obj) + + def unmanage(self, obj): + if obj._cache_priority in self._entries: + if obj.clear(): + del self._entries[obj._cache_priority] class Inodes(object): """Manage the set of inodes. This is the mapping from a numeric id to a concrete File or Directory object""" - def __init__(self): + def __init__(self, cache_cap=1000): self._entries = {} self._counter = itertools.count(llfuse.ROOT_INODE) + self._obj_cache = ObjectCache(cap=cache_cap) def __getitem__(self, item): return self._entries[item] @@ -60,12 +111,20 @@ class Inodes(object): def __contains__(self, k): return k in self._entries + def touch(self, entry): + self._obj_cache.touch(entry) + + def cap_cache(self): + self._obj_cache.cap_cache() + def add_entry(self, entry): entry.inode = next(self._counter) self._entries[entry.inode] = entry + self._obj_cache.manage(entry) return entry def del_entry(self, entry): + self._obj_cache.unmanage(entry) llfuse.invalidate_inode(entry.inode) del self._entries[entry.inode] @@ -82,10 +141,10 @@ class Operations(llfuse.Operations): """ - def __init__(self, uid, gid, encoding="utf-8"): + def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000): super(Operations, self).__init__() - self.inodes = Inodes() + self.inodes = Inodes(cache_cap) self.uid = uid self.gid = gid self.encoding = encoding @@ -187,11 +246,11 @@ class Operations(llfuse.Operations): raise llfuse.FUSEError(errno.EBADF) # update atime - handle.entry._atime = time.time() + handle.fileobj._atime = time.time() try: with llfuse.lock_released: - return handle.entry.readfrom(off, size) + return handle.fileobj.readfrom(off, size) except arvados.errors.NotFoundError as e: _logger.warning("Block not found: " + str(e)) raise llfuse.FUSEError(errno.EIO) @@ -201,7 +260,12 @@ class Operations(llfuse.Operations): def release(self, fh): if fh in self._filehandles: + self._filehandles[fh].release() del self._filehandles[fh] + self.inodes.cap_cache() + + def releasedir(self, fh): + self.release(fh) def opendir(self, inode): _logger.debug("arv-mount opendir: inode %i", inode) @@ -224,8 +288,13 @@ class Operations(llfuse.Operations): # update atime p._atime = time.time() - self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items())) - return fh + try: + p.inc_use() + self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items())) + return fh + finally: + p.dec_use() + def readdir(self, fh, off): _logger.debug("arv-mount readdir: fh %i off %i", fh, off) @@ -235,20 +304,17 @@ class Operations(llfuse.Operations): else: raise llfuse.FUSEError(errno.EBADF) - _logger.debug("arv-mount handle.entry %s", handle.entry) + _logger.debug("arv-mount handle.dirobj %s", handle.dirobj) e = off - while e < len(handle.entry): - if handle.entry[e][1].inode in self.inodes: + while e < len(handle.entries): + if handle.entries[e][1].inode in self.inodes: try: - yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1) + yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1) except UnicodeEncodeError: pass e += 1 - def releasedir(self, fh): - del self._filehandles[fh] - def statfs(self): st = llfuse.StatvfsData() st.f_bsize = 64 * 1024 diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index 11446220b0..cfa81b34b7 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -4,6 +4,7 @@ import time import llfuse import arvados import apiclient +import functools from fusefile import StringFile, StreamReaderFile, ObjectFile from fresh import FreshBase, convertTime @@ -31,6 +32,16 @@ def sanitize_filename(dirty): else: return _disallowed_filename_characters.sub('_', dirty) +def use_counter(orig_func): + @functools.wraps(orig_func) + def use_counter_wrapper(self, *args, **kwargs): + try: + self.inc_use() + return orig_func(self, *args, **kwargs) + finally: + self.dec_use() + return use_counter_wrapper + class Directory(FreshBase): """Generic directory object, backed by a dict. @@ -39,7 +50,7 @@ class Directory(FreshBase): and the value referencing a File or Directory object. """ - def __init__(self, parent_inode): + def __init__(self, parent_inode, inodes): super(Directory, self).__init__() """parent_inode is the integer inode number""" @@ -47,11 +58,14 @@ class Directory(FreshBase): if not isinstance(parent_inode, int): raise Exception("parent_inode should be an int") self.parent_inode = parent_inode + self.inodes = inodes self._entries = {} self._mtime = time.time() + self.use_count = 0 # Overriden by subclasses to implement logic to update the entries dict # when the directory is stale + @use_counter def update(self): pass @@ -60,6 +74,15 @@ class Directory(FreshBase): def size(self): return 0 + def in_use(self): + return self.use_count > 0 + + def inc_use(self): + self.use_count += 1 + + def dec_use(self): + self.use_count -= 1 + def checkupdate(self): if self.stale(): try: @@ -67,22 +90,25 @@ class Directory(FreshBase): except apiclient.errors.HttpError as e: _logger.debug(e) + @use_counter def __getitem__(self, item): self.checkupdate() return self._entries[item] + @use_counter def items(self): self.checkupdate() - return self._entries.items() - - def __iter__(self): - self.checkupdate() - return self._entries.iterkeys() + return list(self._entries.items()) + @use_counter def __contains__(self, k): self.checkupdate() return k in self._entries + def fresh(self): + self.inodes.touch(self) + super(Directory, self).fresh() + def merge(self, items, fn, same, new_entry): """Helper method for updating the contents of the directory. @@ -132,17 +158,26 @@ class Directory(FreshBase): self.fresh() - def clear(self): + def clear(self, force=False): """Delete all entries""" - oldentries = self._entries - self._entries = {} - for n in oldentries: - if isinstance(n, Directory): - n.clear() - llfuse.invalidate_entry(self.inode, str(n)) - self.inodes.del_entry(oldentries[n]) - llfuse.invalidate_inode(self.inode) - self.invalidate() + + if not self.in_use() or force: + oldentries = self._entries + self._entries = {} + for n in oldentries: + if isinstance(n, Directory): + if not n.clear(force): + self._entries = oldentries + return False + for n in oldentries: + if isinstance(n, Directory): + llfuse.invalidate_entry(self.inode, str(n)) + self.inodes.del_entry(oldentries[n]) + llfuse.invalidate_inode(self.inode) + self.invalidate() + return True + else: + return False def mtime(self): return self._mtime @@ -152,8 +187,7 @@ class CollectionDirectory(Directory): """Represents the root of a directory tree holding a collection.""" def __init__(self, parent_inode, inodes, api, num_retries, collection): - super(CollectionDirectory, self).__init__(parent_inode) - self.inodes = inodes + super(CollectionDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries self.collection_object_file = None @@ -187,14 +221,17 @@ class CollectionDirectory(Directory): if self.collection_object_file is not None: self.collection_object_file.update(self.collection_object) - self.clear() + self.clear(force=True) for s in coll_reader.all_streams(): cwd = self for part in s.name().split('/'): if part != '' and part != '.': partname = sanitize_filename(part) if partname not in cwd._entries: - cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode)) + cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes)) + # (hack until using new API) + cwd._entries[partname].inc_use() + # end hack cwd = cwd._entries[partname] for k, v in s.files().items(): cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime())) @@ -257,6 +294,16 @@ class CollectionDirectory(Directory): else: return super(CollectionDirectory, self).__contains__(k) + def invalidate(self): + super(CollectionDirectory, self).invalidate() + self.collection_object = None + + def clear(self, force=False): + if self.collection_locator is None: + return False + else: + return super(CollectionDirectory, self).clear(force) + class MagicDirectory(Directory): """A special directory that logically contains the set of all extant keep locators. @@ -281,8 +328,7 @@ will appear if it exists. """.lstrip() def __init__(self, parent_inode, inodes, api, num_retries): - super(MagicDirectory, self).__init__(parent_inode) - self.inodes = inodes + super(MagicDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries @@ -308,6 +354,7 @@ will appear if it exists. try: e = self.inodes.add_entry(CollectionDirectory( self.inode, self.inodes, self.api, self.num_retries, k)) + if e.update(): self._entries[k] = e return True @@ -323,28 +370,25 @@ will appear if it exists. else: raise KeyError("No collection with id " + item) + def clear(self, force=False): + pass + class RecursiveInvalidateDirectory(Directory): def invalidate(self): - if self.inode == llfuse.ROOT_INODE: - llfuse.lock.acquire() try: super(RecursiveInvalidateDirectory, self).invalidate() for a in self._entries: self._entries[a].invalidate() except Exception: _logger.exception() - finally: - if self.inode == llfuse.ROOT_INODE: - llfuse.lock.release() class TagsDirectory(RecursiveInvalidateDirectory): """A special directory that contains as subdirectories all tags visible to the user.""" def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60): - super(TagsDirectory, self).__init__(parent_inode) - self.inodes = inodes + super(TagsDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries self._poll = True @@ -370,8 +414,7 @@ class TagDirectory(Directory): def __init__(self, parent_inode, inodes, api, num_retries, tag, poll=False, poll_time=60): - super(TagDirectory, self).__init__(parent_inode) - self.inodes = inodes + super(TagDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries self.tag = tag @@ -397,8 +440,7 @@ class ProjectDirectory(Directory): def __init__(self, parent_inode, inodes, api, num_retries, project_object, poll=False, poll_time=60): - super(ProjectDirectory, self).__init__(parent_inode) - self.inodes = inodes + super(ProjectDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries self.project_object = project_object @@ -462,11 +504,6 @@ class ProjectDirectory(Directory): contents = arvados.util.list_all(self.api.groups().contents, self.num_retries, uuid=self.uuid) - # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use. - contents += arvados.util.list_all( - self.api.links().list, self.num_retries, - filters=[['tail_uuid', '=', self.uuid], - ['link_class', '=', 'name']]) # end with llfuse.lock_released, re-acquire lock @@ -494,8 +531,7 @@ class SharedDirectory(Directory): def __init__(self, parent_inode, inodes, api, num_retries, exclude, poll=False, poll_time=60): - super(SharedDirectory, self).__init__(parent_inode) - self.inodes = inodes + super(SharedDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries self.current_user = api.users().current().execute(num_retries=num_retries) diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py index 6dd2d8a3ee..e122d9d2ed 100644 --- a/services/fuse/arvados_fuse/fusefile.py +++ b/services/fuse/arvados_fuse/fusefile.py @@ -24,6 +24,14 @@ class File(FreshBase): def mtime(self): return self._mtime + def clear(self): + pass + + def inc_use(self): + pass + + def dec_use(self): + pass class StreamReaderFile(File): """Wraps a StreamFileReader as a file.""" diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py index 764a099149..4802d69c31 100644 --- a/services/fuse/tests/test_mount.py +++ b/services/fuse/tests/test_mount.py @@ -25,7 +25,7 @@ class MountTestBase(unittest.TestCase): self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings()) def make_mount(self, root_class, **root_kwargs): - operations = fuse.Operations(os.getuid(), os.getgid()) + operations = fuse.Operations(os.getuid(), os.getgid(), cache_cap=2) operations.inodes.add_entry(root_class( llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs)) llfuse.init(operations, self.mounttmp, []) @@ -243,8 +243,7 @@ class FuseSharedTest(MountTestBase): # directory) fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User')) fuse_user_objs.sort() - self.assertEqual(['Empty collection.link', # permission link on collection - 'FUSE Test Project', # project owned by user + self.assertEqual(['FUSE Test Project', # project owned by user 'collection #1 owned by FUSE', # collection owned by user 'collection #2 owned by FUSE', # collection owned by user 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user -- 2.30.2