Merge branch '21224-project-details'
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index 93bcf04978f9b00d1f361a568f90a088b0ea4e6f..c39afa4757cb0ba9a5b415bf7bdb6f7a0e1d8318 100644 (file)
@@ -47,58 +47,34 @@ The general FUSE operation flow is as follows:
 The FUSE driver supports the Arvados event bus.  When an event is received for
 an object that is live in the inode cache, that object is immediately updated.
 
 The FUSE driver supports the Arvados event bus.  When an event is received for
 an object that is live in the inode cache, that object is immediately updated.
 
+Implementation note: in the code, the terms 'object', 'entry' and
+'inode' are used somewhat interchangeably, but generally mean an
+arvados_fuse.File or arvados_fuse.Directory object which has numeric
+inode assigned to it and appears in the Inodes._entries dictionary.
+
 """
 
 """
 
-from __future__ import absolute_import
-from __future__ import division
-from future.utils import viewitems
-from future.utils import native
-from future.utils import listvalues
-from future import standard_library
-standard_library.install_aliases()
-from builtins import next
-from builtins import str
-from builtins import object
 import os
 import os
-import sys
 import llfuse
 import errno
 import stat
 import threading
 import arvados
 import llfuse
 import errno
 import stat
 import threading
 import arvados
-import pprint
 import arvados.events
 import arvados.events
-import re
-import apiclient
-import json
 import logging
 import time
 import logging
 import time
-import _strptime
-import calendar
 import threading
 import itertools
 import threading
 import itertools
-import ciso8601
 import collections
 import functools
 import arvados.keep
 from prometheus_client import Summary
 import queue
 import collections
 import functools
 import arvados.keep
 from prometheus_client import Summary
 import queue
+from dataclasses import dataclass
+import typing
 
 
-# Default _notify_queue has a limit of 1000 items, but it really needs to be
-# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
-# details.
-
-if hasattr(llfuse, 'capi'):
-    # llfuse < 0.42
-    llfuse.capi._notify_queue = queue.Queue()
-else:
-    # llfuse >= 0.42
-    llfuse._notify_queue = queue.Queue()
-
-LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
-
-from .fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from .fusefile import StringFile, FuseArvadosFile
+from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from .fusefile import File, StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -135,28 +111,47 @@ class FileHandle(Handle):
 
 class DirectoryHandle(Handle):
     """Connects a numeric file handle to a Directory object that has
 
 class DirectoryHandle(Handle):
     """Connects a numeric file handle to a Directory object that has
-    been opened by the client."""
+    been opened by the client.
+
+    DirectoryHandle is used by opendir() and readdir() to get
+    directory listings.  Entries returned by readdir() don't increment
+    the lookup count (kernel references), so increment our internal
+    "use count" to avoid having an item being removed mid-read.
+
+    """
 
     def __init__(self, fh, dirobj, entries):
         super(DirectoryHandle, self).__init__(fh, dirobj)
         self.entries = entries
 
 
     def __init__(self, fh, dirobj, entries):
         super(DirectoryHandle, self).__init__(fh, dirobj)
         self.entries = entries
 
+        for ent in self.entries:
+            ent[1].inc_use()
+
+    def release(self):
+        for ent in self.entries:
+            ent[1].dec_use()
+        super(DirectoryHandle, self).release()
+
 
 class InodeCache(object):
     """Records the memory footprint of objects and when they are last used.
 
 
 class InodeCache(object):
     """Records the memory footprint of objects and when they are last used.
 
-    When the cache limit is exceeded, the least recently used objects are
-    cleared.  Clearing the object means discarding its contents to release
-    memory.  The next time the object is accessed, it must be re-fetched from
-    the server.  Note that the inode cache limit is a soft limit; the cache
-    limit may be exceeded if necessary to load very large objects, it may also
-    be exceeded if open file handles prevent objects from being cleared.
+    When the cache limit is exceeded, the least recently used objects
+    are cleared.  Clearing the object means discarding its contents to
+    release memory.  The next time the object is accessed, it must be
+    re-fetched from the server.  Note that the inode cache limit is a
+    soft limit; the cache limit may be exceeded if necessary to load
+    very large projects or collections, it may also be exceeded if an
+    inode can't be safely discarded based on kernel lookups
+    (has_ref()) or internal use count (in_use()).
 
     """
 
     def __init__(self, cap, min_entries=4):
 
     """
 
     def __init__(self, cap, min_entries=4):
-        self._entries = collections.OrderedDict()
-        self._by_uuid = {}
+        # Standard dictionaries are ordered, but OrderedDict is still better here, see
+        # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
+        # specifically we use move_to_end() which standard dicts don't have.
+        self._cache_entries = collections.OrderedDict()
         self.cap = cap
         self._total = 0
         self.min_entries = min_entries
         self.cap = cap
         self._total = 0
         self.min_entries = min_entries
@@ -164,104 +159,147 @@ class InodeCache(object):
     def total(self):
         return self._total
 
     def total(self):
         return self._total
 
-    def _remove(self, obj, clear):
-        if clear:
-            # Kernel behavior seems to be that if a file is
-            # referenced, its parents remain referenced too. This
-            # means has_ref() exits early when a collection is not
-            # candidate for eviction.
-            #
-            # By contrast, in_use() doesn't increment references on
-            # parents, so it requires a full tree walk to determine if
-            # a collection is a candidate for eviction.  This takes
-            # .07s for 240000 files, which becomes a major drag when
-            # cap_cache is being called several times a second and
-            # there are multiple non-evictable collections in the
-            # cache.
-            #
-            # So it is important for performance that we do the
-            # has_ref() check first.
-
-            if obj.has_ref(True):
-                _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
-                return
+    def evict_candidates(self):
+        """Yield entries that are candidates to be evicted
+        and stop when the cache total has shrunk sufficiently.
 
 
-            if obj.in_use():
-                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
-                return
+        Implements a LRU cache, when an item is added or touch()ed it
+        goes to the back of the OrderedDict, so items in the front are
+        oldest.  The Inodes._remove() function determines if the entry
+        can actually be removed safely.
 
 
-            obj.kernel_invalidate()
-            _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
-            obj.clear()
+        """
 
 
-        # The llfuse lock is released in del_entry(), which is called by
-        # Directory.clear().  While the llfuse lock is released, it can happen
-        # that a reentrant call removes this entry before this call gets to it.
-        # Ensure that the entry is still valid before trying to remove it.
-        if obj.inode not in self._entries:
+        if self._total <= self.cap:
             return
 
             return
 
-        self._total -= obj.cache_size
-        del self._entries[obj.inode]
-        if obj.cache_uuid:
-            self._by_uuid[obj.cache_uuid].remove(obj)
-            if not self._by_uuid[obj.cache_uuid]:
-                del self._by_uuid[obj.cache_uuid]
-            obj.cache_uuid = None
-        if clear:
-            _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
+        _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
 
 
-    def cap_cache(self):
-        if self._total > self.cap:
-            for ent in listvalues(self._entries):
-                if self._total < self.cap or len(self._entries) < self.min_entries:
-                    break
-                self._remove(ent, True)
-
-    def manage(self, obj):
-        if obj.persisted():
-            obj.cache_size = obj.objsize()
-            self._entries[obj.inode] = obj
-            obj.cache_uuid = obj.uuid()
-            if obj.cache_uuid:
-                if obj.cache_uuid not in self._by_uuid:
-                    self._by_uuid[obj.cache_uuid] = [obj]
-                else:
-                    if obj not in self._by_uuid[obj.cache_uuid]:
-                        self._by_uuid[obj.cache_uuid].append(obj)
-            self._total += obj.objsize()
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
-                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
-            self.cap_cache()
+        # Copy this into a deque for two reasons:
+        #
+        # 1. _cache_entries is modified by unmanage() which is called
+        # by _remove
+        #
+        # 2. popping off the front means the reference goes away
+        # immediately intead of sticking around for the lifetime of
+        # "values"
+        values = collections.deque(self._cache_entries.values())
 
 
-    def touch(self, obj):
-        if obj.persisted():
-            if obj.inode in self._entries:
-                self._remove(obj, False)
-            self.manage(obj)
+        while values:
+            if self._total < self.cap or len(self._cache_entries) < self.min_entries:
+                break
+            yield values.popleft()
 
 
-    def unmanage(self, obj):
-        if obj.persisted() and obj.inode in self._entries:
-            self._remove(obj, True)
+    def unmanage(self, entry):
+        """Stop managing an object in the cache.
 
 
-    def find_by_uuid(self, uuid):
-        return self._by_uuid.get(uuid, [])
+        This happens when an object is being removed from the inode
+        entries table.
+
+        """
+
+        if entry.inode not in self._cache_entries:
+            return
+
+        # manage cache size running sum
+        self._total -= entry.cache_size
+        entry.cache_size = 0
+
+        # Now forget about it
+        del self._cache_entries[entry.inode]
+
+    def update_cache_size(self, obj):
+        """Update the cache total in response to the footprint of an
+        object changing (usually because it has been loaded or
+        cleared).
+
+        Adds or removes entries to the cache list based on the object
+        cache size.
+
+        """
+
+        if not obj.persisted():
+            return
+
+        if obj.inode in self._cache_entries:
+            self._total -= obj.cache_size
+
+        obj.cache_size = obj.objsize()
+
+        if obj.cache_size > 0 or obj.parent_inode is None:
+            self._total += obj.cache_size
+            self._cache_entries[obj.inode] = obj
+        elif obj.cache_size == 0 and obj.inode in self._cache_entries:
+            del self._cache_entries[obj.inode]
+
+    def touch(self, obj):
+        """Indicate an object was used recently, making it low
+        priority to be removed from the cache.
+
+        """
+        if obj.inode in self._cache_entries:
+            self._cache_entries.move_to_end(obj.inode)
+            return True
+        return False
 
     def clear(self):
 
     def clear(self):
-        self._entries.clear()
-        self._by_uuid.clear()
+        self._cache_entries.clear()
         self._total = 0
 
         self._total = 0
 
+@dataclass
+class RemoveInode:
+    entry: typing.Union[Directory, File]
+    def inode_op(self, inodes, locked_ops):
+        if locked_ops is None:
+            inodes._remove(self.entry)
+            return True
+        else:
+            locked_ops.append(self)
+            return False
+
+@dataclass
+class InvalidateInode:
+    inode: int
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_inode(self.inode)
+        return True
+
+@dataclass
+class InvalidateEntry:
+    inode: int
+    name: str
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_entry(self.inode, self.name)
+        return True
+
+@dataclass
+class EvictCandidates:
+    def inode_op(self, inodes, locked_ops):
+        return True
+
+
 class Inodes(object):
 class Inodes(object):
-    """Manage the set of inodes.  This is the mapping from a numeric id
-    to a concrete File or Directory object"""
+    """Manage the set of inodes.
+
+    This is the mapping from a numeric id to a concrete File or
+    Directory object
+
+    """
 
 
-    def __init__(self, inode_cache, encoding="utf-8"):
+    def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
         self.encoding = encoding
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
         self.encoding = encoding
-        self.deferred_invalidations = []
+        self._fsns = fsns
+        self._shutdown_started = shutdown_started or threading.Event()
+
+        self._inode_remove_queue = queue.Queue()
+        self._inode_remove_thread = threading.Thread(None, self._inode_remove)
+        self._inode_remove_thread.daemon = True
+        self._inode_remove_thread.start()
+
+        self._by_uuid = collections.defaultdict(list)
 
     def __getitem__(self, item):
         return self._entries[item]
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -279,44 +317,194 @@ class Inodes(object):
         return k in self._entries
 
     def touch(self, entry):
         return k in self._entries
 
     def touch(self, entry):
+        """Update the access time, adjust the cache position, and
+        notify the _inode_remove thread to recheck the cache.
+
+        """
+
         entry._atime = time.time()
         entry._atime = time.time()
-        self.inode_cache.touch(entry)
+        if self.inode_cache.touch(entry):
+            self.cap_cache()
+
+    def cap_cache(self):
+        """Notify the _inode_remove thread to recheck the cache."""
+        if self._inode_remove_queue.empty():
+            self._inode_remove_queue.put(EvictCandidates())
+
+    def update_uuid(self, entry):
+        """Update the Arvados uuid associated with an inode entry.
+
+        This is used to look up inodes that need to be invalidated
+        when a websocket event indicates the object has changed on the
+        API server.
+
+        """
+        if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].remove(entry)
+
+        entry.cache_uuid = entry.uuid()
+        if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].append(entry)
+
+        if not self._by_uuid[entry.cache_uuid]:
+            del self._by_uuid[entry.cache_uuid]
 
     def add_entry(self, entry):
 
     def add_entry(self, entry):
+        """Assign a numeric inode to a new entry."""
+
         entry.inode = next(self._counter)
         if entry.inode == llfuse.ROOT_INODE:
             entry.inc_ref()
         self._entries[entry.inode] = entry
         entry.inode = next(self._counter)
         if entry.inode == llfuse.ROOT_INODE:
             entry.inc_ref()
         self._entries[entry.inode] = entry
-        self.inode_cache.manage(entry)
+
+        self.update_uuid(entry)
+        self.inode_cache.update_cache_size(entry)
+        self.cap_cache()
         return entry
 
     def del_entry(self, entry):
         return entry
 
     def del_entry(self, entry):
-        if entry.ref_count == 0:
-            self.inode_cache.unmanage(entry)
-            del self._entries[entry.inode]
+        """Remove entry from the inode table.
+
+        Indicate this inode entry is pending deletion by setting
+        parent_inode to None.  Notify the _inode_remove thread to try
+        and remove it.
+
+        """
+
+        entry.parent_inode = None
+        self._inode_remove_queue.put(RemoveInode(entry))
+        _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+    def _inode_remove(self):
+        """Background thread to handle tasks related to invalidating
+        inodes in the kernel, and removing objects from the inodes
+        table entirely.
+
+        """
+
+        locked_ops = collections.deque()
+        shutting_down = False
+        while not shutting_down:
+            tasks_done = 0
+            blocking_get = True
+            while True:
+                try:
+                    qentry = self._inode_remove_queue.get(blocking_get)
+                except queue.Empty:
+                    break
+
+                blocking_get = False
+                if qentry is None:
+                    shutting_down = True
+                    continue
+
+                # Process (or defer) this entry
+                qentry.inode_op(self, locked_ops)
+                tasks_done += 1
+
+                # Give up the reference
+                qentry = None
+
+            with llfuse.lock:
+                while locked_ops:
+                    locked_ops.popleft().inode_op(self, None)
+                for entry in self.inode_cache.evict_candidates():
+                    self._remove(entry)
+
+            # Unblock _inode_remove_queue.join() only when all of the
+            # deferred work is done, i.e., after calling inode_op()
+            # and then evict_candidates().
+            for _ in range(tasks_done):
+                self._inode_remove_queue.task_done()
+
+    def wait_remove_queue_empty(self):
+        # used by tests
+        self._inode_remove_queue.join()
+
+    def _remove(self, entry):
+        """Remove an inode entry if possible.
+
+        If the entry is still referenced or in use, don't do anything.
+        If this is not referenced but the parent is still referenced,
+        clear any data held by the object (which may include directory
+        entries under the object) but don't remove it from the inode
+        table.
+
+        """
+        try:
+            if entry.inode is None:
+                # Removed already
+                return
+
+            if entry.inode == llfuse.ROOT_INODE:
+                return
+
+            if entry.in_use():
+                # referenced internally, stay pinned
+                #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+                return
+
+            # Tell the kernel it should forget about it
+            entry.kernel_invalidate()
+
+            if entry.has_ref():
+                # has kernel reference, could still be accessed.
+                # when the kernel forgets about it, we can delete it.
+                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+                return
+
+            # commit any pending changes
             with llfuse.lock_released:
                 entry.finalize()
             with llfuse.lock_released:
                 entry.finalize()
-            entry.inode = None
-        else:
-            entry.dead = True
-            _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+            # Clear the contents
+            entry.clear()
+
+            if entry.parent_inode is None:
+                _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
+                              entry.inode, entry.cache_size, self.inode_cache.total(),
+                              len(self._entries), type(entry))
+
+                if entry.cache_uuid:
+                    self._by_uuid[entry.cache_uuid].remove(entry)
+                    if not self._by_uuid[entry.cache_uuid]:
+                        del self._by_uuid[entry.cache_uuid]
+                    entry.cache_uuid = None
+
+                self.inode_cache.unmanage(entry)
+
+                del self._entries[entry.inode]
+                entry.inode = None
+
+        except Exception as e:
+            _logger.exception("failed remove")
 
     def invalidate_inode(self, entry):
 
     def invalidate_inode(self, entry):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_inode(entry.inode)
+            self._inode_remove_queue.put(InvalidateInode(entry.inode))
 
     def invalidate_entry(self, entry, name):
 
     def invalidate_entry(self, entry, name):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+            self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
+
+    def begin_shutdown(self):
+        self._inode_remove_queue.put(None)
+        if self._inode_remove_thread is not None:
+            self._inode_remove_thread.join()
+        self._inode_remove_thread = None
 
     def clear(self):
 
     def clear(self):
+        with llfuse.lock_released:
+            self.begin_shutdown()
+
         self.inode_cache.clear()
         self.inode_cache.clear()
+        self._by_uuid.clear()
 
 
-        for k,v in viewitems(self._entries):
+        for k,v in self._entries.items():
             try:
                 v.finalize()
             except Exception as e:
             try:
                 v.finalize()
             except Exception as e:
@@ -324,6 +512,14 @@ class Inodes(object):
 
         self._entries.clear()
 
 
         self._entries.clear()
 
+    def forward_slash_subst(self):
+        return self._fsns
+
+    def find_by_uuid(self, uuid):
+        """Return a list of zero or more inode entries corresponding
+        to this Arvados UUID."""
+        return self._by_uuid.get(uuid, [])
+
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -336,6 +532,8 @@ def catch_exceptions(orig_func):
             raise
         except EnvironmentError as e:
             raise llfuse.FUSEError(e.errno)
             raise
         except EnvironmentError as e:
             raise llfuse.FUSEError(e.errno)
+        except NotImplementedError:
+            raise llfuse.FUSEError(errno.ENOTSUP)
         except arvados.errors.KeepWriteError as e:
             _logger.error("Keep write error: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
         except arvados.errors.KeepWriteError as e:
             _logger.error("Keep write error: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
@@ -382,14 +580,32 @@ class Operations(llfuse.Operations):
     rename_time = fuse_time.labels(op='rename')
     flush_time = fuse_time.labels(op='flush')
 
     rename_time = fuse_time.labels(op='rename')
     flush_time = fuse_time.labels(op='flush')
 
-    def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
+    def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
         super(Operations, self).__init__()
 
         self._api_client = api_client
 
         if not inode_cache:
             inode_cache = InodeCache(cap=256*1024*1024)
         super(Operations, self).__init__()
 
         self._api_client = api_client
 
         if not inode_cache:
             inode_cache = InodeCache(cap=256*1024*1024)
-        self.inodes = Inodes(inode_cache, encoding=encoding)
+
+        if fsns is None:
+            try:
+                fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
+            except KeyError:
+                # old API server with no FSNS config
+                fsns = '_'
+            else:
+                if fsns == '' or fsns == '/':
+                    fsns = None
+
+        # If we get overlapping shutdown events (e.g., fusermount -u
+        # -z and operations.destroy()) llfuse calls forget() on inodes
+        # that have already been deleted. To avoid this, we make
+        # forget() a no-op if called after destroy().
+        self._shutdown_started = threading.Event()
+
+        self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
+                             shutdown_started=self._shutdown_started)
         self.uid = uid
         self.gid = gid
         self.enable_write = enable_write
         self.uid = uid
         self.gid = gid
         self.enable_write = enable_write
@@ -402,12 +618,6 @@ class Operations(llfuse.Operations):
         # is fully initialized should wait() on this event object.
         self.initlock = threading.Event()
 
         # is fully initialized should wait() on this event object.
         self.initlock = threading.Event()
 
-        # If we get overlapping shutdown events (e.g., fusermount -u
-        # -z and operations.destroy()) llfuse calls forget() on inodes
-        # that have already been deleted. To avoid this, we make
-        # forget() a no-op if called after destroy().
-        self._shutdown_started = threading.Event()
-
         self.num_retries = num_retries
 
         self.read_counter = arvados.keep.Counter()
         self.num_retries = num_retries
 
         self.read_counter = arvados.keep.Counter()
@@ -443,23 +653,26 @@ class Operations(llfuse.Operations):
     def metric_count_func(self, opname):
         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
 
     def metric_count_func(self, opname):
         return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
 
+    def begin_shutdown(self):
+        self._shutdown_started.set()
+        self.inodes.begin_shutdown()
+
     @destroy_time.time()
     @catch_exceptions
     def destroy(self):
     @destroy_time.time()
     @catch_exceptions
     def destroy(self):
-        self._shutdown_started.set()
+        _logger.debug("arv-mount destroy: start")
+
+        with llfuse.lock_released:
+            self.begin_shutdown()
+
         if self.events:
             self.events.close()
             self.events = None
 
         if self.events:
             self.events.close()
             self.events = None
 
-        # Different versions of llfuse require and forbid us to
-        # acquire the lock here. See #8345#note-37, #10805#note-9.
-        if LLFUSE_VERSION_0 and llfuse.lock.acquire():
-            # llfuse < 0.42
-            self.inodes.clear()
-            llfuse.lock.release()
-        else:
-            # llfuse >= 0.42
-            self.inodes.clear()
+        self.inodes.clear()
+
+        _logger.debug("arv-mount destroy: complete")
+
 
     def access(self, inode, mode, ctx):
         return True
 
     def access(self, inode, mode, ctx):
         return True
@@ -480,39 +693,34 @@ class Operations(llfuse.Operations):
             old_attrs = properties.get("old_attributes") or {}
             new_attrs = properties.get("new_attributes") or {}
 
             old_attrs = properties.get("old_attributes") or {}
             new_attrs = properties.get("new_attributes") or {}
 
-            for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
+            for item in self.inodes.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
                 item.invalidate()
-                if ev.get("object_kind") == "arvados#collection":
-                    pdh = new_attrs.get("portable_data_hash")
-                    # new_attributes.modified_at currently lacks
-                    # subsecond precision (see #6347) so use event_at
-                    # which should always be the same.
-                    stamp = ev.get("event_at")
-                    if (stamp and pdh and item.writable() and
-                        item.collection is not None and
-                        item.collection.modified() and
-                        new_attrs.get("is_trashed") is not True):
-                        item.update(to_record_version=(stamp, pdh))
 
             oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
 
             oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
-                    self.inodes.inode_cache.find_by_uuid(oldowner) +
-                    self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.child_event(ev)
+                    self.inodes.find_by_uuid(oldowner) +
+                    self.inodes.find_by_uuid(newowner)):
+                parent.invalidate()
 
     @getattr_time.time()
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
 
     @getattr_time.time()
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
+            _logger.debug("arv-mount getattr: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         e = self.inodes[inode]
             raise llfuse.FUSEError(errno.ENOENT)
 
         e = self.inodes[inode]
+        self.inodes.touch(e)
+        parent = None
+        if e.parent_inode:
+            parent = self.inodes[e.parent_inode]
+            self.inodes.touch(parent)
 
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
 
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 0
+        entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
@@ -580,18 +788,23 @@ class Operations(llfuse.Operations):
 
         if name == '.':
             inode = parent_inode
 
         if name == '.':
             inode = parent_inode
-        else:
-            if parent_inode in self.inodes:
-                p = self.inodes[parent_inode]
-                self.inodes.touch(p)
-                if name == '..':
-                    inode = p.parent_inode
-                elif isinstance(p, Directory) and name in p:
-                    inode = p[name].inode
+        elif parent_inode in self.inodes:
+            p = self.inodes[parent_inode]
+            self.inodes.touch(p)
+            if name == '..':
+                inode = p.parent_inode
+            elif isinstance(p, Directory) and name in p:
+                if p[name].inode is None:
+                    _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
+                                  parent_inode, name)
+                    raise llfuse.FUSEError(errno.ENOENT)
+
+                inode = p[name].inode
 
         if inode != None:
             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
                       parent_inode, name, inode)
 
         if inode != None:
             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
                       parent_inode, name, inode)
+            self.inodes.touch(self.inodes[inode])
             self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
             self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
@@ -607,7 +820,7 @@ class Operations(llfuse.Operations):
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
-            if ent.dec_ref(nlookup) == 0 and ent.dead:
+            if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
                 self.inodes.del_entry(ent)
 
     @open_time.time()
                 self.inodes.del_entry(ent)
 
     @open_time.time()
@@ -616,6 +829,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.debug("arv-mount open: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if isinstance(p, Directory):
             raise llfuse.FUSEError(errno.ENOENT)
 
         if isinstance(p, Directory):
@@ -662,7 +876,7 @@ class Operations(llfuse.Operations):
         r = handle.obj.readfrom(off, size, self.num_retries)
         if r:
             self.read_counter.add(len(r))
         r = handle.obj.readfrom(off, size, self.num_retries)
         if r:
             self.read_counter.add(len(r))
-        return r.encode('utf-8')
+        return r
 
     @write_time.time()
     @catch_exceptions
 
     @write_time.time()
     @catch_exceptions
@@ -697,7 +911,7 @@ class Operations(llfuse.Operations):
             finally:
                 self._filehandles[fh].release()
                 del self._filehandles[fh]
             finally:
                 self._filehandles[fh].release()
                 del self._filehandles[fh]
-        self.inodes.inode_cache.cap_cache()
+        self.inodes.cap_cache()
 
     def releasedir(self, fh):
         self.release(fh)
 
     def releasedir(self, fh):
         self.release(fh)
@@ -710,6 +924,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if not isinstance(p, Directory):
             raise llfuse.FUSEError(errno.ENOENT)
 
         if not isinstance(p, Directory):
@@ -719,12 +934,16 @@ class Operations(llfuse.Operations):
         if p.parent_inode in self.inodes:
             parent = self.inodes[p.parent_inode]
         else:
         if p.parent_inode in self.inodes:
             parent = self.inodes[p.parent_inode]
         else:
+            _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
             raise llfuse.FUSEError(errno.EIO)
 
             raise llfuse.FUSEError(errno.EIO)
 
+        _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
+
         # update atime
         # update atime
+        p.inc_use()
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
+        p.dec_use()
         self.inodes.touch(p)
         self.inodes.touch(p)
-
-        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
         return fh
 
     @readdir_time.time()
         return fh
 
     @readdir_time.time()
@@ -739,8 +958,9 @@ class Operations(llfuse.Operations):
 
         e = off
         while e < len(handle.entries):
 
         e = off
         while e < len(handle.entries):
-            if handle.entries[e][1].inode in self.inodes:
-                yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
+            ent = handle.entries[e]
+            if ent[1].inode in self.inodes:
+                yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
             e += 1
 
     @statfs_time.time()
             e += 1
 
     @statfs_time.time()
@@ -780,7 +1000,7 @@ class Operations(llfuse.Operations):
     @create_time.time()
     @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx=None):
     @create_time.time()
     @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx=None):
-        name = name.decode()
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -798,6 +1018,7 @@ class Operations(llfuse.Operations):
     @mkdir_time.time()
     @catch_exceptions
     def mkdir(self, inode_parent, name, mode, ctx=None):
     @mkdir_time.time()
     @catch_exceptions
     def mkdir(self, inode_parent, name, mode, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -812,6 +1033,7 @@ class Operations(llfuse.Operations):
     @unlink_time.time()
     @catch_exceptions
     def unlink(self, inode_parent, name, ctx=None):
     @unlink_time.time()
     @catch_exceptions
     def unlink(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
@@ -819,6 +1041,7 @@ class Operations(llfuse.Operations):
     @rmdir_time.time()
     @catch_exceptions
     def rmdir(self, inode_parent, name, ctx=None):
     @rmdir_time.time()
     @catch_exceptions
     def rmdir(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
@@ -826,6 +1049,8 @@ class Operations(llfuse.Operations):
     @rename_time.time()
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
     @rename_time.time()
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
+        name_old = name_old.decode(encoding=self.inodes.encoding)
+        name_new = name_new.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
         dest = self._check_writable(inode_parent_new)
         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
         dest = self._check_writable(inode_parent_new)