3198: Implement cache management for directory objects.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 13 Apr 2015 20:42:46 +0000 (16:42 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 13 Apr 2015 20:42:46 +0000 (16:42 -0400)
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py
services/fuse/tests/test_mount.py

index a886e37f6240de191005110f6e2a15b26d986190..deb0cd35a321a847d484437ffbbb31b85756a2d2 100644 (file)
@@ -21,6 +21,7 @@ import calendar
 import threading
 import itertools
 import ciso8601
+import collections
 
 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
 from fusefile import StreamReaderFile
@@ -29,21 +30,71 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 
 
 class FileHandle(object):
-    """Connects a numeric file handle to a File or Directory object that has
+    """Connects a numeric file handle to a File object that has
     been opened by the client."""
 
-    def __init__(self, fh, entry):
+    def __init__(self, fh, fileobj):
         self.fh = fh
-        self.entry = entry
+        self.fileobj = fileobj
+        self.fileobj.inc_use()
+
+    def release(self):
+        self.fileobj.dec_use()
+
+
+class DirectoryHandle(object):
+    """Connects a numeric file handle to a Directory object that has
+    been opened by the client."""
+
+    def __init__(self, fh, dirobj, entries):
+        self.fh = fh
+        self.entries = entries
+        self.dirobj = dirobj
+        self.dirobj.inc_use()
+
+    def release(self):
+        self.dirobj.dec_use()
+
+
+class ObjectCache(object):
+    def __init__(self, cap):
+        self._entries = collections.OrderedDict()
+        self._counter = itertools.count(1)
+        self.cap = cap
+
+    def cap_cache(self):
+        if len(self._entries) > self.cap:
+            ent = iter(self._entries)
+            ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
+            for key in ents:
+                capobj = self._entries[key]
+                if capobj.clear():
+                    del self._entries[key]
+
+    def manage(self, obj):
+        obj._cache_priority = next(self._counter)
+        self._entries[obj._cache_priority] = obj
+        self.cap_cache()
+
+    def touch(self, obj):
+        if obj._cache_priority in self._entries:
+            del self._entries[obj._cache_priority]
+        self.manage(obj)
+
+    def unmanage(self, obj):
+        if obj._cache_priority in self._entries:
+            if obj.clear():
+                del self._entries[obj._cache_priority]
 
 
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self):
+    def __init__(self, cache_cap=1000):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
+        self._obj_cache = ObjectCache(cap=cache_cap)
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -60,12 +111,20 @@ class Inodes(object):
     def __contains__(self, k):
         return k in self._entries
 
+    def touch(self, entry):
+        self._obj_cache.touch(entry)
+
+    def cap_cache(self):
+        self._obj_cache.cap_cache()
+
     def add_entry(self, entry):
         entry.inode = next(self._counter)
         self._entries[entry.inode] = entry
+        self._obj_cache.manage(entry)
         return entry
 
     def del_entry(self, entry):
+        self._obj_cache.unmanage(entry)
         llfuse.invalidate_inode(entry.inode)
         del self._entries[entry.inode]
 
@@ -82,10 +141,10 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, encoding="utf-8"):
+    def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000):
         super(Operations, self).__init__()
 
-        self.inodes = Inodes()
+        self.inodes = Inodes(cache_cap)
         self.uid = uid
         self.gid = gid
         self.encoding = encoding
@@ -187,11 +246,11 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.EBADF)
 
         # update atime
-        handle.entry._atime = time.time()
+        handle.fileobj._atime = time.time()
 
         try:
             with llfuse.lock_released:
-                return handle.entry.readfrom(off, size)
+                return handle.fileobj.readfrom(off, size)
         except arvados.errors.NotFoundError as e:
             _logger.warning("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
@@ -201,7 +260,12 @@ class Operations(llfuse.Operations):
 
     def release(self, fh):
         if fh in self._filehandles:
+            self._filehandles[fh].release()
             del self._filehandles[fh]
+        self.inodes.cap_cache()
+
+    def releasedir(self, fh):
+        self.release(fh)
 
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
@@ -224,8 +288,13 @@ class Operations(llfuse.Operations):
         # update atime
         p._atime = time.time()
 
-        self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
-        return fh
+        try:
+            p.inc_use()
+            self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+            return fh
+        finally:
+            p.dec_use()
+
 
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -235,20 +304,17 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        _logger.debug("arv-mount handle.entry %s", handle.entry)
+        _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
 
         e = off
-        while e < len(handle.entry):
-            if handle.entry[e][1].inode in self.inodes:
+        while e < len(handle.entries):
+            if handle.entries[e][1].inode in self.inodes:
                 try:
-                    yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
+                    yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
                 except UnicodeEncodeError:
                     pass
             e += 1
 
-    def releasedir(self, fh):
-        del self._filehandles[fh]
-
     def statfs(self):
         st = llfuse.StatvfsData()
         st.f_bsize = 64 * 1024
index 11446220b0d43fe3e023668244a0d86fd413e601..cfa81b34b7a3d78e0b260f748facbc29f04c9897 100644 (file)
@@ -4,6 +4,7 @@ import time
 import llfuse
 import arvados
 import apiclient
+import functools
 
 from fusefile import StringFile, StreamReaderFile, ObjectFile
 from fresh import FreshBase, convertTime
@@ -31,6 +32,16 @@ def sanitize_filename(dirty):
     else:
         return _disallowed_filename_characters.sub('_', dirty)
 
+def use_counter(orig_func):
+    @functools.wraps(orig_func)
+    def use_counter_wrapper(self, *args, **kwargs):
+        try:
+            self.inc_use()
+            return orig_func(self, *args, **kwargs)
+        finally:
+            self.dec_use()
+    return use_counter_wrapper
+
 
 class Directory(FreshBase):
     """Generic directory object, backed by a dict.
@@ -39,7 +50,7 @@ class Directory(FreshBase):
     and the value referencing a File or Directory object.
     """
 
-    def __init__(self, parent_inode):
+    def __init__(self, parent_inode, inodes):
         super(Directory, self).__init__()
 
         """parent_inode is the integer inode number"""
@@ -47,11 +58,14 @@ class Directory(FreshBase):
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
+        self.inodes = inodes
         self._entries = {}
         self._mtime = time.time()
+        self.use_count = 0
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
+    @use_counter
     def update(self):
         pass
 
@@ -60,6 +74,15 @@ class Directory(FreshBase):
     def size(self):
         return 0
 
+    def in_use(self):
+        return self.use_count > 0
+
+    def inc_use(self):
+        self.use_count += 1
+
+    def dec_use(self):
+        self.use_count -= 1
+
     def checkupdate(self):
         if self.stale():
             try:
@@ -67,22 +90,25 @@ class Directory(FreshBase):
             except apiclient.errors.HttpError as e:
                 _logger.debug(e)
 
+    @use_counter
     def __getitem__(self, item):
         self.checkupdate()
         return self._entries[item]
 
+    @use_counter
     def items(self):
         self.checkupdate()
-        return self._entries.items()
-
-    def __iter__(self):
-        self.checkupdate()
-        return self._entries.iterkeys()
+        return list(self._entries.items())
 
+    @use_counter
     def __contains__(self, k):
         self.checkupdate()
         return k in self._entries
 
+    def fresh(self):
+        self.inodes.touch(self)
+        super(Directory, self).fresh()
+
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
 
@@ -132,17 +158,26 @@ class Directory(FreshBase):
 
         self.fresh()
 
-    def clear(self):
+    def clear(self, force=False):
         """Delete all entries"""
-        oldentries = self._entries
-        self._entries = {}
-        for n in oldentries:
-            if isinstance(n, Directory):
-                n.clear()
-            llfuse.invalidate_entry(self.inode, str(n))
-            self.inodes.del_entry(oldentries[n])
-        llfuse.invalidate_inode(self.inode)
-        self.invalidate()
+
+        if not self.in_use() or force:
+            oldentries = self._entries
+            self._entries = {}
+            for n in oldentries:
+                if isinstance(n, Directory):
+                    if not n.clear(force):
+                        self._entries = oldentries
+                        return False
+            for n in oldentries:
+                if isinstance(n, Directory):
+                    llfuse.invalidate_entry(self.inode, str(n))
+                    self.inodes.del_entry(oldentries[n])
+            llfuse.invalidate_inode(self.inode)
+            self.invalidate()
+            return True
+        else:
+            return False
 
     def mtime(self):
         return self._mtime
@@ -152,8 +187,7 @@ class CollectionDirectory(Directory):
     """Represents the root of a directory tree holding a collection."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, collection):
-        super(CollectionDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(CollectionDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.collection_object_file = None
@@ -187,14 +221,17 @@ class CollectionDirectory(Directory):
         if self.collection_object_file is not None:
             self.collection_object_file.update(self.collection_object)
 
-        self.clear()
+        self.clear(force=True)
         for s in coll_reader.all_streams():
             cwd = self
             for part in s.name().split('/'):
                 if part != '' and part != '.':
                     partname = sanitize_filename(part)
                     if partname not in cwd._entries:
-                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
+                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
+                        # (hack until using new API)
+                        cwd._entries[partname].inc_use()
+                        # end hack
                     cwd = cwd._entries[partname]
             for k, v in s.files().items():
                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
@@ -257,6 +294,16 @@ class CollectionDirectory(Directory):
         else:
             return super(CollectionDirectory, self).__contains__(k)
 
+    def invalidate(self):
+        super(CollectionDirectory, self).invalidate()
+        self.collection_object = None
+
+    def clear(self, force=False):
+        if self.collection_locator is None:
+            return False
+        else:
+            return super(CollectionDirectory, self).clear(force)
+
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
@@ -281,8 +328,7 @@ will appear if it exists.
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries):
-        super(MagicDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(MagicDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
 
@@ -308,6 +354,7 @@ will appear if it exists.
         try:
             e = self.inodes.add_entry(CollectionDirectory(
                     self.inode, self.inodes, self.api, self.num_retries, k))
+
             if e.update():
                 self._entries[k] = e
                 return True
@@ -323,28 +370,25 @@ will appear if it exists.
         else:
             raise KeyError("No collection with id " + item)
 
+    def clear(self, force=False):
+        pass
+
 
 class RecursiveInvalidateDirectory(Directory):
     def invalidate(self):
-        if self.inode == llfuse.ROOT_INODE:
-            llfuse.lock.acquire()
         try:
             super(RecursiveInvalidateDirectory, self).invalidate()
             for a in self._entries:
                 self._entries[a].invalidate()
         except Exception:
             _logger.exception()
-        finally:
-            if self.inode == llfuse.ROOT_INODE:
-                llfuse.lock.release()
 
 
 class TagsDirectory(RecursiveInvalidateDirectory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagsDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -370,8 +414,7 @@ class TagDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, tag,
                  poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
@@ -397,8 +440,7 @@ class ProjectDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
                  poll=False, poll_time=60):
-        super(ProjectDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(ProjectDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
@@ -462,11 +504,6 @@ class ProjectDirectory(Directory):
 
             contents = arvados.util.list_all(self.api.groups().contents,
                                              self.num_retries, uuid=self.uuid)
-            # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(
-                self.api.links().list, self.num_retries,
-                filters=[['tail_uuid', '=', self.uuid],
-                         ['link_class', '=', 'name']])
 
         # end with llfuse.lock_released, re-acquire lock
 
@@ -494,8 +531,7 @@ class SharedDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
                  poll=False, poll_time=60):
-        super(SharedDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(SharedDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)
index 6dd2d8a3ee279fc1514ad5a295459c6c7dfcad87..e122d9d2edc2118143fc7e78310dc69cb4fff8ce 100644 (file)
@@ -24,6 +24,14 @@ class File(FreshBase):
     def mtime(self):
         return self._mtime
 
+    def clear(self):
+        pass
+
+    def inc_use(self):
+        pass
+
+    def dec_use(self):
+        pass
 
 class StreamReaderFile(File):
     """Wraps a StreamFileReader as a file."""
index 764a099149a452ffe4258f2f8a0b922fd381b83a..4802d69c313f1a1053ef2b3015e984e856d77a60 100644 (file)
@@ -25,7 +25,7 @@ class MountTestBase(unittest.TestCase):
         self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def make_mount(self, root_class, **root_kwargs):
-        operations = fuse.Operations(os.getuid(), os.getgid())
+        operations = fuse.Operations(os.getuid(), os.getgid(), cache_cap=2)
         operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(operations, self.mounttmp, [])
@@ -243,8 +243,7 @@ class FuseSharedTest(MountTestBase):
         # directory)
         fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
         fuse_user_objs.sort()
-        self.assertEqual(['Empty collection.link',                # permission link on collection
-                          'FUSE Test Project',                    # project owned by user
+        self.assertEqual(['FUSE Test Project',                    # project owned by user
                           'collection #1 owned by FUSE',          # collection owned by user
                           'collection #2 owned by FUSE',          # collection owned by user
                           'pipeline instance owned by FUSE.pipelineInstance',  # pipeline instance owned by user