3198: Implement cache management for directory objects.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index a886e37f6240de191005110f6e2a15b26d986190..deb0cd35a321a847d484437ffbbb31b85756a2d2 100644 (file)
@@ -21,6 +21,7 @@ import calendar
 import threading
 import itertools
 import ciso8601
+import collections
 
 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
 from fusefile import StreamReaderFile
@@ -29,21 +30,71 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 
 
 class FileHandle(object):
-    """Connects a numeric file handle to a File or Directory object that has
+    """Connects a numeric file handle to a File object that has
     been opened by the client."""
 
-    def __init__(self, fh, entry):
+    def __init__(self, fh, fileobj):
         self.fh = fh
-        self.entry = entry
+        self.fileobj = fileobj
+        self.fileobj.inc_use()
+
+    def release(self):
+        self.fileobj.dec_use()
+
+
+class DirectoryHandle(object):
+    """Connects a numeric file handle to a Directory object that has
+    been opened by the client."""
+
+    def __init__(self, fh, dirobj, entries):
+        self.fh = fh
+        self.entries = entries
+        self.dirobj = dirobj
+        self.dirobj.inc_use()
+
+    def release(self):
+        self.dirobj.dec_use()
+
+
+class ObjectCache(object):
+    def __init__(self, cap):
+        self._entries = collections.OrderedDict()
+        self._counter = itertools.count(1)
+        self.cap = cap
+
+    def cap_cache(self):
+        if len(self._entries) > self.cap:
+            ent = iter(self._entries)
+            ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
+            for key in ents:
+                capobj = self._entries[key]
+                if capobj.clear():
+                    del self._entries[key]
+
+    def manage(self, obj):
+        obj._cache_priority = next(self._counter)
+        self._entries[obj._cache_priority] = obj
+        self.cap_cache()
+
+    def touch(self, obj):
+        if obj._cache_priority in self._entries:
+            del self._entries[obj._cache_priority]
+        self.manage(obj)
+
+    def unmanage(self, obj):
+        if obj._cache_priority in self._entries:
+            if obj.clear():
+                del self._entries[obj._cache_priority]
 
 
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self):
+    def __init__(self, cache_cap=1000):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
+        self._obj_cache = ObjectCache(cap=cache_cap)
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -60,12 +111,20 @@ class Inodes(object):
     def __contains__(self, k):
         return k in self._entries
 
+    def touch(self, entry):
+        self._obj_cache.touch(entry)
+
+    def cap_cache(self):
+        self._obj_cache.cap_cache()
+
     def add_entry(self, entry):
         entry.inode = next(self._counter)
         self._entries[entry.inode] = entry
+        self._obj_cache.manage(entry)
         return entry
 
     def del_entry(self, entry):
+        self._obj_cache.unmanage(entry)
         llfuse.invalidate_inode(entry.inode)
         del self._entries[entry.inode]
 
@@ -82,10 +141,10 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, encoding="utf-8"):
+    def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000):
         super(Operations, self).__init__()
 
-        self.inodes = Inodes()
+        self.inodes = Inodes(cache_cap)
         self.uid = uid
         self.gid = gid
         self.encoding = encoding
@@ -187,11 +246,11 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.EBADF)
 
         # update atime
-        handle.entry._atime = time.time()
+        handle.fileobj._atime = time.time()
 
         try:
             with llfuse.lock_released:
-                return handle.entry.readfrom(off, size)
+                return handle.fileobj.readfrom(off, size)
         except arvados.errors.NotFoundError as e:
             _logger.warning("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
@@ -201,7 +260,12 @@ class Operations(llfuse.Operations):
 
     def release(self, fh):
         if fh in self._filehandles:
+            self._filehandles[fh].release()
             del self._filehandles[fh]
+        self.inodes.cap_cache()
+
+    def releasedir(self, fh):
+        self.release(fh)
 
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
@@ -224,8 +288,13 @@ class Operations(llfuse.Operations):
         # update atime
         p._atime = time.time()
 
-        self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
-        return fh
+        try:
+            p.inc_use()
+            self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+            return fh
+        finally:
+            p.dec_use()
+
 
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -235,20 +304,17 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        _logger.debug("arv-mount handle.entry %s", handle.entry)
+        _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
 
         e = off
-        while e < len(handle.entry):
-            if handle.entry[e][1].inode in self.inodes:
+        while e < len(handle.entries):
+            if handle.entries[e][1].inode in self.inodes:
                 try:
-                    yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
+                    yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
                 except UnicodeEncodeError:
                     pass
             e += 1
 
-    def releasedir(self, fh):
-        del self._filehandles[fh]
-
     def statfs(self):
         st = llfuse.StatvfsData()
         st.f_bsize = 64 * 1024