21541: Code cleanup and additional memory usage improvements
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index 08a44c953380ea86f3fcf59dc37207bbea636a1e..2dfba464738868b5ced8e468e26e9eb1d5db9e34 100644 (file)
@@ -56,12 +56,6 @@ inode assigned to it and appears in the Inodes._entries dictionary.
 
 from __future__ import absolute_import
 from __future__ import division
-from future.utils import viewitems
-from future.utils import native
-from future.utils import listvalues
-from future.utils import listitems
-from future import standard_library
-standard_library.install_aliases()
 from builtins import next
 from builtins import str
 from builtins import object
@@ -81,9 +75,12 @@ import functools
 import arvados.keep
 from prometheus_client import Summary
 import queue
+from dataclasses import dataclass
+import typing
+import gc
 
 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from .fusefile import StringFile, FuseArvadosFile
+from .fusefile import File, StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -158,7 +155,6 @@ class InodeCache(object):
 
     def __init__(self, cap, min_entries=4):
         self._cache_entries = collections.OrderedDict()
-        self._by_uuid = {}
         self.cap = cap
         self._total = 0
         self.min_entries = min_entries
@@ -181,39 +177,21 @@ class InodeCache(object):
             return
 
         _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
-        for ent in listvalues(self._cache_entries):
+
+        # Copy this into a deque for two reasons:
+        #
+        # 1. _cache_entries is modified by unmanage() which is called
+        # by _remove
+        #
+        # 2. popping off the front means the reference goes away
+        # immediately intead of sticking around for the lifetime of
+        # "values"
+        values = collections.deque(self._cache_entries.values())
+
+        while len(values) > 0:
             if self._total < self.cap or len(self._cache_entries) < self.min_entries:
                 break
-            if ent.cache_size > 0 or ent.dead:
-                # if cache_size is zero it's been cleared already
-                yield ent
-
-    def manage(self, obj):
-        """Add a new object to be cache managed.
-
-        This means evict_candidates will suggest clearing and removing
-        the inode when there is memory pressure.
-
-        """
-
-        if obj.inode in self._cache_entries:
-            return
-
-        obj.cache_size = obj.objsize()
-        self._total += obj.cache_size
-
-        self._cache_entries[obj.inode] = obj
-
-        obj.cache_uuid = obj.uuid()
-        if obj.cache_uuid:
-            if obj.cache_uuid not in self._by_uuid:
-                self._by_uuid[obj.cache_uuid] = [obj]
-            else:
-                if obj not in self._by_uuid[obj.cache_uuid]:
-                    self._by_uuid[obj.cache_uuid].append(obj)
-
-        _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
-                      obj.inode, obj.cache_size, obj.cache_uuid, self._total, len(self._cache_entries))
+            yield values.popleft()
 
     def unmanage(self, entry):
         """Stop managing an object in the cache.
@@ -230,13 +208,6 @@ class InodeCache(object):
         self._total -= entry.cache_size
         entry.cache_size = 0
 
-        # manage the mapping of uuid to object
-        if entry.cache_uuid:
-            self._by_uuid[entry.cache_uuid].remove(entry)
-            if not self._by_uuid[entry.cache_uuid]:
-                del self._by_uuid[entry.cache_uuid]
-            entry.cache_uuid = None
-
         # Now forget about it
         del self._cache_entries[entry.inode]
 
@@ -245,11 +216,24 @@ class InodeCache(object):
         object changing (usually because it has been loaded or
         cleared).
 
+        Adds or removes entries to the cache list based on the object
+        cache size.
+
         """
+
+        if not obj.persisted():
+            return
+
         if obj.inode in self._cache_entries:
             self._total -= obj.cache_size
-            obj.cache_size = obj.objsize()
+
+        obj.cache_size = obj.objsize()
+
+        if obj.cache_size > 0 or obj.parent_inode is None:
             self._total += obj.cache_size
+            self._cache_entries[obj.inode] = obj
+        elif obj.cache_size == 0 and obj.inode in self._cache_entries:
+            del self._cache_entries[obj.inode]
 
     def touch(self, obj):
         """Indicate an object was used recently, making it low
@@ -258,17 +242,44 @@ class InodeCache(object):
         """
         if obj.inode in self._cache_entries:
             self._cache_entries.move_to_end(obj.inode)
-        else:
-            self.manage(obj)
-
-    def find_by_uuid(self, uuid):
-        return self._by_uuid.get(uuid, [])
+            return True
+        return False
 
     def clear(self):
         self._cache_entries.clear()
-        self._by_uuid.clear()
         self._total = 0
 
+@dataclass
+class RemoveInode:
+    entry: typing.Union[Directory, File]
+    def inode_op(self, inodes, locked_ops):
+        if locked_ops is None:
+            inodes._remove(self.entry)
+            return True
+        else:
+            locked_ops.append(self)
+            return False
+
+@dataclass
+class InvalidateInode:
+    inode: int
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_inode(self.inode)
+        return True
+
+@dataclass
+class InvalidateEntry:
+    inode: int
+    name: str
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_entry(self.inode, self.name)
+        return True
+
+@dataclass
+class EvictCandidates:
+    def inode_op(self, inodes, locked_ops):
+        return True
+
 
 class Inodes(object):
     """Manage the set of inodes.
@@ -291,6 +302,9 @@ class Inodes(object):
         self._inode_remove_thread.daemon = True
         self._inode_remove_thread.start()
 
+        self.cap_cache_event = threading.Event()
+        self._by_uuid = collections.defaultdict(list)
+
     def __getitem__(self, item):
         return self._entries[item]
 
@@ -301,7 +315,7 @@ class Inodes(object):
         return iter(self._entries.keys())
 
     def items(self):
-        return viewitems(self._entries.items())
+        return self._entries.items()
 
     def __contains__(self, k):
         return k in self._entries
@@ -313,12 +327,32 @@ class Inodes(object):
         """
 
         entry._atime = time.time()
-        self.inode_cache.touch(entry)
-        self.cap_cache()
+        if self.inode_cache.touch(entry):
+            self.cap_cache()
 
     def cap_cache(self):
         """Notify the _inode_remove thread to recheck the cache."""
-        self._inode_remove_queue.put(("evict_candidates",))
+        if not self.cap_cache_event.is_set():
+            self.cap_cache_event.set()
+            self._inode_remove_queue.put(EvictCandidates())
+
+    def update_uuid(self, entry):
+        """Update the Arvados uuid associated with an inode entry.
+
+        This is used to look up inodes that need to be invalidated
+        when a websocket event indicates the object has changed on the
+        API server.
+
+        """
+        if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].remove(entry)
+
+        entry.cache_uuid = entry.uuid()
+        if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].append(entry)
+
+        if not self._by_uuid[entry.cache_uuid]:
+            del self._by_uuid[entry.cache_uuid]
 
     def add_entry(self, entry):
         """Assign a numeric inode to a new entry."""
@@ -327,23 +361,23 @@ class Inodes(object):
         if entry.inode == llfuse.ROOT_INODE:
             entry.inc_ref()
         self._entries[entry.inode] = entry
-        if entry.persisted():
-            # only "persisted" items can be reloaded from the server
-            # making them safe to evict automatically.
-            self.inode_cache.manage(entry)
+
+        self.update_uuid(entry)
+        self.inode_cache.update_cache_size(entry)
         self.cap_cache()
         return entry
 
     def del_entry(self, entry):
         """Remove entry from the inode table.
 
-        Put a tombstone marker on it and notify the _inode_remove
-        thread to try and remove it.
+        Indicate this inode entry is pending deletion by setting
+        parent_inode to None.  Notify the _inode_remove thread to try
+        and remove it.
 
         """
 
-        entry.dead = True
-        self._inode_remove_queue.put(("remove", entry))
+        entry.parent_inode = None
+        self._inode_remove_queue.put(RemoveInode(entry))
         _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
 
     def _inode_remove(self):
@@ -355,61 +389,33 @@ class Inodes(object):
 
         locked_ops = collections.deque()
         while True:
-            try:
-                entry = self._inode_remove_queue.get(True)
-                if entry is None:
+            blocking_get = True
+            while True:
+                try:
+                    qentry = self._inode_remove_queue.get(blocking_get)
+                except queue.Empty:
+                    break
+                blocking_get = False
+                if qentry is None:
                     return
+
+                if self._shutdown_started.is_set():
+                    continue
+
                 # Process this entry
-                _logger.debug("_inode_remove %s", entry)
-                if self._inode_op(entry, locked_ops):
+                if qentry.inode_op(self, locked_ops):
                     self._inode_remove_queue.task_done()
 
-                # Drain the queue of any other entries
-                while True:
-                    try:
-                        entry = self._inode_remove_queue.get(False)
-                        if entry is None:
-                            return
-                        _logger.debug("_inode_remove %s", entry)
-                        if self._inode_op(entry, locked_ops):
-                            self._inode_remove_queue.task_done()
-                    except queue.Empty:
-                        break
-
-                with llfuse.lock:
-                    while len(locked_ops) > 0:
-                        if self._inode_op(locked_ops.popleft(), None):
-                            self._inode_remove_queue.task_done()
-                    for entry in self.inode_cache.evict_candidates():
-                        self._remove(entry)
-            except Exception as e:
-                _logger.exception("_inode_remove")
+                # Give up the reference
+                qentry = None
 
-    def _inode_op(self, op, locked_ops):
-        """Process an inode operation: attempt to remove an inode
-        entry, tell the kernel to invalidate a inode metadata or
-        directory entry, or trigger a cache check.
-
-        """
-        if self._shutdown_started.is_set():
-            return True
-        if op[0] == "remove":
-            if locked_ops is None:
-                self._remove(op[1])
-                return True
-            else:
-                locked_ops.append(op)
-                return False
-        if op[0] == "invalidate_inode":
-            _logger.debug("sending invalidate inode %i", op[1])
-            llfuse.invalidate_inode(op[1])
-            return True
-        if op[0] == "invalidate_entry":
-            _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
-            llfuse.invalidate_entry(op[1], op[2])
-            return True
-        if op[0] == "evict_candidates":
-            return True
+            with llfuse.lock:
+                while len(locked_ops) > 0:
+                    if locked_ops.popleft().inode_op(self, None):
+                        self._inode_remove_queue.task_done()
+                self.cap_cache_event.clear()
+                for entry in self.inode_cache.evict_candidates():
+                    self._remove(entry)
 
     def wait_remove_queue_empty(self):
         # used by tests
@@ -430,13 +436,7 @@ class Inodes(object):
                 # Removed already
                 return
 
-            # Tell the kernel it should forget about it.
-            entry.kernel_invalidate()
-
-            if entry.has_ref():
-                # has kernel reference, could still be accessed.
-                # when the kernel forgets about it, we can delete it.
-                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+            if entry.inode == llfuse.ROOT_INODE:
                 return
 
             if entry.in_use():
@@ -444,35 +444,38 @@ class Inodes(object):
                 #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
                 return
 
-            forget_inode = True
-            parent = self._entries.get(entry.parent_inode)
-            if (parent is not None and parent.has_ref()) or entry.inode == llfuse.ROOT_INODE:
-                # the parent is still referenced, so we'll keep the
-                # entry but wipe out the stuff under it
-                forget_inode = False
+            # Tell the kernel it should forget about it
+            entry.kernel_invalidate()
 
-            if entry.cache_size == 0 and not forget_inode:
-                # Was cleared already
+            if entry.has_ref():
+                # has kernel reference, could still be accessed.
+                # when the kernel forgets about it, we can delete it.
+                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
                 return
 
-            if forget_inode:
-                self.inode_cache.unmanage(entry)
-
-            _logger.debug("InodeCache removing inode %i", entry.inode)
+            # commit any pending changes
+            with llfuse.lock_released:
+                entry.finalize()
 
-            # For directories, clear the contents
+            # Clear the contents
             entry.clear()
 
-            _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s, inode entries %i, type %s",
-                          entry.inode, self.inode_cache.total(), forget_inode,
-                          len(self._entries), type(entry))
-            if forget_inode:
+            if entry.parent_inode is None:
+                _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
+                              entry.inode, entry.cache_size, self.inode_cache.total(),
+                              len(self._entries), type(entry))
+
+                if entry.cache_uuid:
+                    self._by_uuid[entry.cache_uuid].remove(entry)
+                    if not self._by_uuid[entry.cache_uuid]:
+                        del self._by_uuid[entry.cache_uuid]
+                    entry.cache_uuid = None
+
+                self.inode_cache.unmanage(entry)
+
                 del self._entries[entry.inode]
                 entry.inode = None
 
-            # stop anything else
-            with llfuse.lock_released:
-                entry.finalize()
         except Exception as e:
             _logger.exception("failed remove")
 
@@ -480,13 +483,13 @@ class Inodes(object):
         if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            self._inode_remove_queue.put(("invalidate_inode", entry.inode))
+            self._inode_remove_queue.put(InvalidateInode(entry.inode))
 
     def invalidate_entry(self, entry, name):
         if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
+            self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
 
     def begin_shutdown(self):
         self._inode_remove_queue.put(None)
@@ -499,8 +502,9 @@ class Inodes(object):
             self.begin_shutdown()
 
         self.inode_cache.clear()
+        self._by_uuid.clear()
 
-        for k,v in viewitems(self._entries):
+        for k,v in self._entries.items():
             try:
                 v.finalize()
             except Exception as e:
@@ -511,6 +515,11 @@ class Inodes(object):
     def forward_slash_subst(self):
         return self._fsns
 
+    def find_by_uuid(self, uuid):
+        """Return a list of zero or more inode entries corresponding
+        to this Arvados UUID."""
+        return self._by_uuid.get(uuid, [])
+
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -617,6 +626,13 @@ class Operations(llfuse.Operations):
         self.write_ops_counter = arvados.keep.Counter()
 
         self.events = None
+l
+        # We rely on the cyclic garbage collector to deallocate
+        # Collection objects from the Python SDK.  A lower GC
+        # threshold encourages Python to be more aggressive in
+        # reclaiming these and seems to slow down the growth in memory
+        # usage over time.
+        gc.set_threshold(200)
 
     def init(self):
         # Allow threads that are waiting for the driver to be finished
@@ -653,7 +669,8 @@ class Operations(llfuse.Operations):
     def destroy(self):
         _logger.debug("arv-mount destroy: start")
 
-        self.begin_shutdown()
+        with llfuse.lock_released:
+            self.begin_shutdown()
 
         if self.events:
             self.events.close()
@@ -683,14 +700,14 @@ class Operations(llfuse.Operations):
             old_attrs = properties.get("old_attributes") or {}
             new_attrs = properties.get("new_attributes") or {}
 
-            for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
+            for item in self.inodes.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
 
             oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
-                    self.inodes.inode_cache.find_by_uuid(oldowner) +
-                    self.inodes.inode_cache.find_by_uuid(newowner)):
+                    self.inodes.find_by_uuid(oldowner) +
+                    self.inodes.find_by_uuid(newowner)):
                 parent.invalidate()
 
     @getattr_time.time()
@@ -701,11 +718,16 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.ENOENT)
 
         e = self.inodes[inode]
+        self.inodes.touch(e)
+        parent = None
+        if e.parent_inode:
+            parent = self.inodes[e.parent_inode]
+            self.inodes.touch(parent)
 
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 0
+        entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
@@ -779,11 +801,17 @@ class Operations(llfuse.Operations):
             if name == '..':
                 inode = p.parent_inode
             elif isinstance(p, Directory) and name in p:
+                if p[name].inode is None:
+                    _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
+                                  parent_inode, name)
+                    raise llfuse.FUSEError(errno.ENOENT)
+
                 inode = p[name].inode
 
         if inode != None:
             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
                       parent_inode, name, inode)
+            self.inodes.touch(self.inodes[inode])
             self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
@@ -799,7 +827,7 @@ class Operations(llfuse.Operations):
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
-            if ent.dec_ref(nlookup) == 0 and ent.dead:
+            if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
                 self.inodes.del_entry(ent)
 
     @open_time.time()
@@ -919,10 +947,10 @@ class Operations(llfuse.Operations):
         _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
 
         # update atime
-        self.inodes.touch(p)
         p.inc_use()
-        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
         p.dec_use()
+        self.inodes.touch(p)
         return fh
 
     @readdir_time.time()