21541: Fix KeyError, segfaults, and memory use issues
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index 31afcda8d12267970631372014706793ef95c9f3..08a44c953380ea86f3fcf59dc37207bbea636a1e 100644 (file)
@@ -47,6 +47,11 @@ The general FUSE operation flow is as follows:
 The FUSE driver supports the Arvados event bus.  When an event is received for
 an object that is live in the inode cache, that object is immediately updated.
 
+Implementation note: in the code, the terms 'object', 'entry' and
+'inode' are used somewhat interchangeably, but generally mean an
+arvados_fuse.File or arvados_fuse.Directory object which has numeric
+inode assigned to it and appears in the Inodes._entries dictionary.
+
 """
 
 from __future__ import absolute_import
@@ -77,19 +82,6 @@ import arvados.keep
 from prometheus_client import Summary
 import queue
 
-# Default _notify_queue has a limit of 1000 items, but it really needs to be
-# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
-# details.
-
-if hasattr(llfuse, 'capi'):
-    # llfuse < 0.42
-    llfuse.capi._notify_queue = queue.Queue()
-else:
-    # llfuse >= 0.42
-    llfuse._notify_queue = queue.Queue()
-
-LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
-
 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
 from .fusefile import StringFile, FuseArvadosFile
 
@@ -128,27 +120,44 @@ class FileHandle(Handle):
 
 class DirectoryHandle(Handle):
     """Connects a numeric file handle to a Directory object that has
-    been opened by the client."""
+    been opened by the client.
+
+    DirectoryHandle is used by opendir() and readdir() to get
+    directory listings.  Entries returned by readdir() don't increment
+    the lookup count (kernel references), so increment our internal
+    "use count" to avoid having an item being removed mid-read.
+
+    """
 
     def __init__(self, fh, dirobj, entries):
         super(DirectoryHandle, self).__init__(fh, dirobj)
         self.entries = entries
 
+        for ent in self.entries:
+            ent[1].inc_use()
+
+    def release(self):
+        for ent in self.entries:
+            ent[1].dec_use()
+        super(DirectoryHandle, self).release()
+
 
 class InodeCache(object):
     """Records the memory footprint of objects and when they are last used.
 
-    When the cache limit is exceeded, the least recently used objects are
-    cleared.  Clearing the object means discarding its contents to release
-    memory.  The next time the object is accessed, it must be re-fetched from
-    the server.  Note that the inode cache limit is a soft limit; the cache
-    limit may be exceeded if necessary to load very large objects, it may also
-    be exceeded if open file handles prevent objects from being cleared.
+    When the cache limit is exceeded, the least recently used objects
+    are cleared.  Clearing the object means discarding its contents to
+    release memory.  The next time the object is accessed, it must be
+    re-fetched from the server.  Note that the inode cache limit is a
+    soft limit; the cache limit may be exceeded if necessary to load
+    very large projects or collections, it may also be exceeded if an
+    inode can't be safely discarded based on kernel lookups
+    (has_ref()) or internal use count (in_use()).
 
     """
 
     def __init__(self, cap, min_entries=4):
-        self._entries = collections.OrderedDict()
+        self._cache_entries = collections.OrderedDict()
         self._by_uuid = {}
         self.cap = cap
         self._total = 0
@@ -157,104 +166,130 @@ class InodeCache(object):
     def total(self):
         return self._total
 
-    def _remove(self, obj, clear):
-        if clear:
-            # Kernel behavior seems to be that if a file is
-            # referenced, its parents remain referenced too. This
-            # means has_ref() exits early when a collection is not
-            # candidate for eviction.
-            #
-            # By contrast, in_use() doesn't increment references on
-            # parents, so it requires a full tree walk to determine if
-            # a collection is a candidate for eviction.  This takes
-            # .07s for 240000 files, which becomes a major drag when
-            # cap_cache is being called several times a second and
-            # there are multiple non-evictable collections in the
-            # cache.
-            #
-            # So it is important for performance that we do the
-            # has_ref() check first.
-
-            if obj.has_ref(True):
-                _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
-                return
+    def evict_candidates(self):
+        """Yield entries that are candidates to be evicted
+        and stop when the cache total has shrunk sufficiently.
 
-            if obj.in_use():
-                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
-                return
+        Implements a LRU cache, when an item is added or touch()ed it
+        goes to the back of the OrderedDict, so items in the front are
+        oldest.  The Inodes._remove() function determines if the entry
+        can actually be removed safely.
 
-            obj.kernel_invalidate()
-            _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
-            obj.clear()
+        """
 
-        # The llfuse lock is released in del_entry(), which is called by
-        # Directory.clear().  While the llfuse lock is released, it can happen
-        # that a reentrant call removes this entry before this call gets to it.
-        # Ensure that the entry is still valid before trying to remove it.
-        if obj.inode not in self._entries:
+        if self._total <= self.cap:
             return
 
-        self._total -= obj.cache_size
-        del self._entries[obj.inode]
+        _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
+        for ent in listvalues(self._cache_entries):
+            if self._total < self.cap or len(self._cache_entries) < self.min_entries:
+                break
+            if ent.cache_size > 0 or ent.dead:
+                # if cache_size is zero it's been cleared already
+                yield ent
+
+    def manage(self, obj):
+        """Add a new object to be cache managed.
+
+        This means evict_candidates will suggest clearing and removing
+        the inode when there is memory pressure.
+
+        """
+
+        if obj.inode in self._cache_entries:
+            return
+
+        obj.cache_size = obj.objsize()
+        self._total += obj.cache_size
+
+        self._cache_entries[obj.inode] = obj
+
+        obj.cache_uuid = obj.uuid()
         if obj.cache_uuid:
-            self._by_uuid[obj.cache_uuid].remove(obj)
-            if not self._by_uuid[obj.cache_uuid]:
-                del self._by_uuid[obj.cache_uuid]
-            obj.cache_uuid = None
-        if clear:
-            _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
+            if obj.cache_uuid not in self._by_uuid:
+                self._by_uuid[obj.cache_uuid] = [obj]
+            else:
+                if obj not in self._by_uuid[obj.cache_uuid]:
+                    self._by_uuid[obj.cache_uuid].append(obj)
 
-    def cap_cache(self):
-        if self._total > self.cap:
-            for ent in listvalues(self._entries):
-                if self._total < self.cap or len(self._entries) < self.min_entries:
-                    break
-                self._remove(ent, True)
+        _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
+                      obj.inode, obj.cache_size, obj.cache_uuid, self._total, len(self._cache_entries))
 
-    def manage(self, obj):
-        if obj.persisted():
+    def unmanage(self, entry):
+        """Stop managing an object in the cache.
+
+        This happens when an object is being removed from the inode
+        entries table.
+
+        """
+
+        if entry.inode not in self._cache_entries:
+            return
+
+        # manage cache size running sum
+        self._total -= entry.cache_size
+        entry.cache_size = 0
+
+        # manage the mapping of uuid to object
+        if entry.cache_uuid:
+            self._by_uuid[entry.cache_uuid].remove(entry)
+            if not self._by_uuid[entry.cache_uuid]:
+                del self._by_uuid[entry.cache_uuid]
+            entry.cache_uuid = None
+
+        # Now forget about it
+        del self._cache_entries[entry.inode]
+
+    def update_cache_size(self, obj):
+        """Update the cache total in response to the footprint of an
+        object changing (usually because it has been loaded or
+        cleared).
+
+        """
+        if obj.inode in self._cache_entries:
+            self._total -= obj.cache_size
             obj.cache_size = obj.objsize()
-            self._entries[obj.inode] = obj
-            obj.cache_uuid = obj.uuid()
-            if obj.cache_uuid:
-                if obj.cache_uuid not in self._by_uuid:
-                    self._by_uuid[obj.cache_uuid] = [obj]
-                else:
-                    if obj not in self._by_uuid[obj.cache_uuid]:
-                        self._by_uuid[obj.cache_uuid].append(obj)
-            self._total += obj.objsize()
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
-                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
-            self.cap_cache()
+            self._total += obj.cache_size
 
     def touch(self, obj):
-        if obj.persisted():
-            if obj.inode in self._entries:
-                self._remove(obj, False)
-            self.manage(obj)
+        """Indicate an object was used recently, making it low
+        priority to be removed from the cache.
 
-    def unmanage(self, obj):
-        if obj.persisted() and obj.inode in self._entries:
-            self._remove(obj, True)
+        """
+        if obj.inode in self._cache_entries:
+            self._cache_entries.move_to_end(obj.inode)
+        else:
+            self.manage(obj)
 
     def find_by_uuid(self, uuid):
         return self._by_uuid.get(uuid, [])
 
     def clear(self):
-        self._entries.clear()
+        self._cache_entries.clear()
         self._by_uuid.clear()
         self._total = 0
 
+
 class Inodes(object):
-    """Manage the set of inodes.  This is the mapping from a numeric id
-    to a concrete File or Directory object"""
+    """Manage the set of inodes.
+
+    This is the mapping from a numeric id to a concrete File or
+    Directory object
 
-    def __init__(self, inode_cache, encoding="utf-8"):
+    """
+
+    def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
         self.encoding = encoding
-        self.deferred_invalidations = []
+        self._fsns = fsns
+        self._shutdown_started = shutdown_started or threading.Event()
+
+        self._inode_remove_queue = queue.Queue()
+        self._inode_remove_thread = threading.Thread(None, self._inode_remove)
+        self._inode_remove_thread.daemon = True
+        self._inode_remove_thread.start()
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -272,41 +307,197 @@ class Inodes(object):
         return k in self._entries
 
     def touch(self, entry):
+        """Update the access time, adjust the cache position, and
+        notify the _inode_remove thread to recheck the cache.
+
+        """
+
         entry._atime = time.time()
         self.inode_cache.touch(entry)
+        self.cap_cache()
+
+    def cap_cache(self):
+        """Notify the _inode_remove thread to recheck the cache."""
+        self._inode_remove_queue.put(("evict_candidates",))
 
     def add_entry(self, entry):
+        """Assign a numeric inode to a new entry."""
+
         entry.inode = next(self._counter)
         if entry.inode == llfuse.ROOT_INODE:
             entry.inc_ref()
         self._entries[entry.inode] = entry
-        self.inode_cache.manage(entry)
+        if entry.persisted():
+            # only "persisted" items can be reloaded from the server
+            # making them safe to evict automatically.
+            self.inode_cache.manage(entry)
+        self.cap_cache()
         return entry
 
     def del_entry(self, entry):
-        if entry.ref_count == 0:
-            self.inode_cache.unmanage(entry)
-            del self._entries[entry.inode]
+        """Remove entry from the inode table.
+
+        Put a tombstone marker on it and notify the _inode_remove
+        thread to try and remove it.
+
+        """
+
+        entry.dead = True
+        self._inode_remove_queue.put(("remove", entry))
+        _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+    def _inode_remove(self):
+        """Background thread to handle tasks related to invalidating
+        inodes in the kernel, and removing objects from the inodes
+        table entirely.
+
+        """
+
+        locked_ops = collections.deque()
+        while True:
+            try:
+                entry = self._inode_remove_queue.get(True)
+                if entry is None:
+                    return
+                # Process this entry
+                _logger.debug("_inode_remove %s", entry)
+                if self._inode_op(entry, locked_ops):
+                    self._inode_remove_queue.task_done()
+
+                # Drain the queue of any other entries
+                while True:
+                    try:
+                        entry = self._inode_remove_queue.get(False)
+                        if entry is None:
+                            return
+                        _logger.debug("_inode_remove %s", entry)
+                        if self._inode_op(entry, locked_ops):
+                            self._inode_remove_queue.task_done()
+                    except queue.Empty:
+                        break
+
+                with llfuse.lock:
+                    while len(locked_ops) > 0:
+                        if self._inode_op(locked_ops.popleft(), None):
+                            self._inode_remove_queue.task_done()
+                    for entry in self.inode_cache.evict_candidates():
+                        self._remove(entry)
+            except Exception as e:
+                _logger.exception("_inode_remove")
+
+    def _inode_op(self, op, locked_ops):
+        """Process an inode operation: attempt to remove an inode
+        entry, tell the kernel to invalidate a inode metadata or
+        directory entry, or trigger a cache check.
+
+        """
+        if self._shutdown_started.is_set():
+            return True
+        if op[0] == "remove":
+            if locked_ops is None:
+                self._remove(op[1])
+                return True
+            else:
+                locked_ops.append(op)
+                return False
+        if op[0] == "invalidate_inode":
+            _logger.debug("sending invalidate inode %i", op[1])
+            llfuse.invalidate_inode(op[1])
+            return True
+        if op[0] == "invalidate_entry":
+            _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
+            llfuse.invalidate_entry(op[1], op[2])
+            return True
+        if op[0] == "evict_candidates":
+            return True
+
+    def wait_remove_queue_empty(self):
+        # used by tests
+        self._inode_remove_queue.join()
+
+    def _remove(self, entry):
+        """Remove an inode entry if possible.
+
+        If the entry is still referenced or in use, don't do anything.
+        If this is not referenced but the parent is still referenced,
+        clear any data held by the object (which may include directory
+        entries under the object) but don't remove it from the inode
+        table.
+
+        """
+        try:
+            if entry.inode is None:
+                # Removed already
+                return
+
+            # Tell the kernel it should forget about it.
+            entry.kernel_invalidate()
+
+            if entry.has_ref():
+                # has kernel reference, could still be accessed.
+                # when the kernel forgets about it, we can delete it.
+                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+                return
+
+            if entry.in_use():
+                # referenced internally, stay pinned
+                #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+                return
+
+            forget_inode = True
+            parent = self._entries.get(entry.parent_inode)
+            if (parent is not None and parent.has_ref()) or entry.inode == llfuse.ROOT_INODE:
+                # the parent is still referenced, so we'll keep the
+                # entry but wipe out the stuff under it
+                forget_inode = False
+
+            if entry.cache_size == 0 and not forget_inode:
+                # Was cleared already
+                return
+
+            if forget_inode:
+                self.inode_cache.unmanage(entry)
+
+            _logger.debug("InodeCache removing inode %i", entry.inode)
+
+            # For directories, clear the contents
+            entry.clear()
+
+            _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s, inode entries %i, type %s",
+                          entry.inode, self.inode_cache.total(), forget_inode,
+                          len(self._entries), type(entry))
+            if forget_inode:
+                del self._entries[entry.inode]
+                entry.inode = None
+
+            # stop anything else
             with llfuse.lock_released:
                 entry.finalize()
-            entry.inode = None
-        else:
-            entry.dead = True
-            _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+        except Exception as e:
+            _logger.exception("failed remove")
 
     def invalidate_inode(self, entry):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_inode(entry.inode)
+            self._inode_remove_queue.put(("invalidate_inode", entry.inode))
 
     def invalidate_entry(self, entry, name):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+            self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
+
+    def begin_shutdown(self):
+        self._inode_remove_queue.put(None)
+        if self._inode_remove_thread is not None:
+            self._inode_remove_thread.join()
+        self._inode_remove_thread = None
 
     def clear(self):
+        with llfuse.lock_released:
+            self.begin_shutdown()
+
         self.inode_cache.clear()
 
         for k,v in viewitems(self._entries):
@@ -317,6 +508,9 @@ class Inodes(object):
 
         self._entries.clear()
 
+    def forward_slash_subst(self):
+        return self._fsns
+
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -377,14 +571,32 @@ class Operations(llfuse.Operations):
     rename_time = fuse_time.labels(op='rename')
     flush_time = fuse_time.labels(op='flush')
 
-    def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
+    def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
         super(Operations, self).__init__()
 
         self._api_client = api_client
 
         if not inode_cache:
             inode_cache = InodeCache(cap=256*1024*1024)
-        self.inodes = Inodes(inode_cache, encoding=encoding)
+
+        if fsns is None:
+            try:
+                fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
+            except KeyError:
+                # old API server with no FSNS config
+                fsns = '_'
+            else:
+                if fsns == '' or fsns == '/':
+                    fsns = None
+
+        # If we get overlapping shutdown events (e.g., fusermount -u
+        # -z and operations.destroy()) llfuse calls forget() on inodes
+        # that have already been deleted. To avoid this, we make
+        # forget() a no-op if called after destroy().
+        self._shutdown_started = threading.Event()
+
+        self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
+                             shutdown_started=self._shutdown_started)
         self.uid = uid
         self.gid = gid
         self.enable_write = enable_write
@@ -397,12 +609,6 @@ class Operations(llfuse.Operations):
         # is fully initialized should wait() on this event object.
         self.initlock = threading.Event()
 
-        # If we get overlapping shutdown events (e.g., fusermount -u
-        # -z and operations.destroy()) llfuse calls forget() on inodes
-        # that have already been deleted. To avoid this, we make
-        # forget() a no-op if called after destroy().
-        self._shutdown_started = threading.Event()
-
         self.num_retries = num_retries
 
         self.read_counter = arvados.keep.Counter()
@@ -438,23 +644,25 @@ class Operations(llfuse.Operations):
     def metric_count_func(self, opname):
         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
 
+    def begin_shutdown(self):
+        self._shutdown_started.set()
+        self.inodes.begin_shutdown()
+
     @destroy_time.time()
     @catch_exceptions
     def destroy(self):
-        self._shutdown_started.set()
+        _logger.debug("arv-mount destroy: start")
+
+        self.begin_shutdown()
+
         if self.events:
             self.events.close()
             self.events = None
 
-        # Different versions of llfuse require and forbid us to
-        # acquire the lock here. See #8345#note-37, #10805#note-9.
-        if LLFUSE_VERSION_0 and llfuse.lock.acquire():
-            # llfuse < 0.42
-            self.inodes.clear()
-            llfuse.lock.release()
-        else:
-            # llfuse >= 0.42
-            self.inodes.clear()
+        self.inodes.clear()
+
+        _logger.debug("arv-mount destroy: complete")
+
 
     def access(self, inode, mode, ctx):
         return True
@@ -489,6 +697,7 @@ class Operations(llfuse.Operations):
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
+            _logger.debug("arv-mount getattr: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         e = self.inodes[inode]
@@ -564,14 +773,13 @@ class Operations(llfuse.Operations):
 
         if name == '.':
             inode = parent_inode
-        else:
-            if parent_inode in self.inodes:
-                p = self.inodes[parent_inode]
-                self.inodes.touch(p)
-                if name == '..':
-                    inode = p.parent_inode
-                elif isinstance(p, Directory) and name in p:
-                    inode = p[name].inode
+        elif parent_inode in self.inodes:
+            p = self.inodes[parent_inode]
+            self.inodes.touch(p)
+            if name == '..':
+                inode = p.parent_inode
+            elif isinstance(p, Directory) and name in p:
+                inode = p[name].inode
 
         if inode != None:
             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
@@ -600,6 +808,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.debug("arv-mount open: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if isinstance(p, Directory):
@@ -681,7 +890,7 @@ class Operations(llfuse.Operations):
             finally:
                 self._filehandles[fh].release()
                 del self._filehandles[fh]
-        self.inodes.inode_cache.cap_cache()
+        self.inodes.cap_cache()
 
     def releasedir(self, fh):
         self.release(fh)
@@ -694,6 +903,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if not isinstance(p, Directory):
@@ -703,11 +913,16 @@ class Operations(llfuse.Operations):
         if p.parent_inode in self.inodes:
             parent = self.inodes[p.parent_inode]
         else:
+            _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
             raise llfuse.FUSEError(errno.EIO)
 
+        _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
+
         # update atime
         self.inodes.touch(p)
+        p.inc_use()
         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+        p.dec_use()
         return fh
 
     @readdir_time.time()
@@ -722,8 +937,9 @@ class Operations(llfuse.Operations):
 
         e = off
         while e < len(handle.entries):
-            if handle.entries[e][1].inode in self.inodes:
-                yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
+            ent = handle.entries[e]
+            if ent[1].inode in self.inodes:
+                yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
             e += 1
 
     @statfs_time.time()