3198: Implement cache management for directory objects.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 11446220b0d43fe3e023668244a0d86fd413e601..cfa81b34b7a3d78e0b260f748facbc29f04c9897 100644 (file)
@@ -4,6 +4,7 @@ import time
 import llfuse
 import arvados
 import apiclient
+import functools
 
 from fusefile import StringFile, StreamReaderFile, ObjectFile
 from fresh import FreshBase, convertTime
@@ -31,6 +32,16 @@ def sanitize_filename(dirty):
     else:
         return _disallowed_filename_characters.sub('_', dirty)
 
+def use_counter(orig_func):
+    @functools.wraps(orig_func)
+    def use_counter_wrapper(self, *args, **kwargs):
+        try:
+            self.inc_use()
+            return orig_func(self, *args, **kwargs)
+        finally:
+            self.dec_use()
+    return use_counter_wrapper
+
 
 class Directory(FreshBase):
     """Generic directory object, backed by a dict.
@@ -39,7 +50,7 @@ class Directory(FreshBase):
     and the value referencing a File or Directory object.
     """
 
-    def __init__(self, parent_inode):
+    def __init__(self, parent_inode, inodes):
         super(Directory, self).__init__()
 
         """parent_inode is the integer inode number"""
@@ -47,11 +58,14 @@ class Directory(FreshBase):
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
+        self.inodes = inodes
         self._entries = {}
         self._mtime = time.time()
+        self.use_count = 0
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
+    @use_counter
     def update(self):
         pass
 
@@ -60,6 +74,15 @@ class Directory(FreshBase):
     def size(self):
         return 0
 
+    def in_use(self):
+        return self.use_count > 0
+
+    def inc_use(self):
+        self.use_count += 1
+
+    def dec_use(self):
+        self.use_count -= 1
+
     def checkupdate(self):
         if self.stale():
             try:
@@ -67,22 +90,25 @@ class Directory(FreshBase):
             except apiclient.errors.HttpError as e:
                 _logger.debug(e)
 
+    @use_counter
     def __getitem__(self, item):
         self.checkupdate()
         return self._entries[item]
 
+    @use_counter
     def items(self):
         self.checkupdate()
-        return self._entries.items()
-
-    def __iter__(self):
-        self.checkupdate()
-        return self._entries.iterkeys()
+        return list(self._entries.items())
 
+    @use_counter
     def __contains__(self, k):
         self.checkupdate()
         return k in self._entries
 
+    def fresh(self):
+        self.inodes.touch(self)
+        super(Directory, self).fresh()
+
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
 
@@ -132,17 +158,26 @@ class Directory(FreshBase):
 
         self.fresh()
 
-    def clear(self):
+    def clear(self, force=False):
         """Delete all entries"""
-        oldentries = self._entries
-        self._entries = {}
-        for n in oldentries:
-            if isinstance(n, Directory):
-                n.clear()
-            llfuse.invalidate_entry(self.inode, str(n))
-            self.inodes.del_entry(oldentries[n])
-        llfuse.invalidate_inode(self.inode)
-        self.invalidate()
+
+        if not self.in_use() or force:
+            oldentries = self._entries
+            self._entries = {}
+            for n in oldentries:
+                if isinstance(n, Directory):
+                    if not n.clear(force):
+                        self._entries = oldentries
+                        return False
+            for n in oldentries:
+                if isinstance(n, Directory):
+                    llfuse.invalidate_entry(self.inode, str(n))
+                    self.inodes.del_entry(oldentries[n])
+            llfuse.invalidate_inode(self.inode)
+            self.invalidate()
+            return True
+        else:
+            return False
 
     def mtime(self):
         return self._mtime
@@ -152,8 +187,7 @@ class CollectionDirectory(Directory):
     """Represents the root of a directory tree holding a collection."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, collection):
-        super(CollectionDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(CollectionDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.collection_object_file = None
@@ -187,14 +221,17 @@ class CollectionDirectory(Directory):
         if self.collection_object_file is not None:
             self.collection_object_file.update(self.collection_object)
 
-        self.clear()
+        self.clear(force=True)
         for s in coll_reader.all_streams():
             cwd = self
             for part in s.name().split('/'):
                 if part != '' and part != '.':
                     partname = sanitize_filename(part)
                     if partname not in cwd._entries:
-                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
+                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
+                        # (hack until using new API)
+                        cwd._entries[partname].inc_use()
+                        # end hack
                     cwd = cwd._entries[partname]
             for k, v in s.files().items():
                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
@@ -257,6 +294,16 @@ class CollectionDirectory(Directory):
         else:
             return super(CollectionDirectory, self).__contains__(k)
 
+    def invalidate(self):
+        super(CollectionDirectory, self).invalidate()
+        self.collection_object = None
+
+    def clear(self, force=False):
+        if self.collection_locator is None:
+            return False
+        else:
+            return super(CollectionDirectory, self).clear(force)
+
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
@@ -281,8 +328,7 @@ will appear if it exists.
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries):
-        super(MagicDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(MagicDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
 
@@ -308,6 +354,7 @@ will appear if it exists.
         try:
             e = self.inodes.add_entry(CollectionDirectory(
                     self.inode, self.inodes, self.api, self.num_retries, k))
+
             if e.update():
                 self._entries[k] = e
                 return True
@@ -323,28 +370,25 @@ will appear if it exists.
         else:
             raise KeyError("No collection with id " + item)
 
+    def clear(self, force=False):
+        pass
+
 
 class RecursiveInvalidateDirectory(Directory):
     def invalidate(self):
-        if self.inode == llfuse.ROOT_INODE:
-            llfuse.lock.acquire()
         try:
             super(RecursiveInvalidateDirectory, self).invalidate()
             for a in self._entries:
                 self._entries[a].invalidate()
         except Exception:
             _logger.exception()
-        finally:
-            if self.inode == llfuse.ROOT_INODE:
-                llfuse.lock.release()
 
 
 class TagsDirectory(RecursiveInvalidateDirectory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagsDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -370,8 +414,7 @@ class TagDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, tag,
                  poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
@@ -397,8 +440,7 @@ class ProjectDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
                  poll=False, poll_time=60):
-        super(ProjectDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(ProjectDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
@@ -462,11 +504,6 @@ class ProjectDirectory(Directory):
 
             contents = arvados.util.list_all(self.api.groups().contents,
                                              self.num_retries, uuid=self.uuid)
-            # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(
-                self.api.links().list, self.num_retries,
-                filters=[['tail_uuid', '=', self.uuid],
-                         ['link_class', '=', 'name']])
 
         # end with llfuse.lock_released, re-acquire lock
 
@@ -494,8 +531,7 @@ class SharedDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
                  poll=False, poll_time=60):
-        super(SharedDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(SharedDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)