21541: Code cleanup and additional memory usage improvements
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index f1e49f5afcffff32143b9033c5f83dddcd0c7c65..2dfba464738868b5ced8e468e26e9eb1d5db9e34 100644 (file)
@@ -47,48 +47,40 @@ The general FUSE operation flow is as follows:
 The FUSE driver supports the Arvados event bus.  When an event is received for
 an object that is live in the inode cache, that object is immediately updated.
 
+Implementation note: in the code, the terms 'object', 'entry' and
+'inode' are used somewhat interchangeably, but generally mean an
+arvados_fuse.File or arvados_fuse.Directory object which has numeric
+inode assigned to it and appears in the Inodes._entries dictionary.
+
 """
 
+from __future__ import absolute_import
+from __future__ import division
+from builtins import next
+from builtins import str
+from builtins import object
 import os
-import sys
 import llfuse
 import errno
 import stat
 import threading
 import arvados
-import pprint
 import arvados.events
-import re
-import apiclient
-import json
 import logging
 import time
-import _strptime
-import calendar
 import threading
 import itertools
-import ciso8601
 import collections
 import functools
 import arvados.keep
+from prometheus_client import Summary
+import queue
+from dataclasses import dataclass
+import typing
+import gc
 
-import Queue
-
-# Default _notify_queue has a limit of 1000 items, but it really needs to be
-# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
-# details.
-
-if hasattr(llfuse, 'capi'):
-    # llfuse < 0.42
-    llfuse.capi._notify_queue = Queue.Queue()
-else:
-    # llfuse >= 0.42
-    llfuse._notify_queue = Queue.Queue()
-
-LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
-
-from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from fusefile import StringFile, FuseArvadosFile
+from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from .fusefile import File, StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -125,28 +117,44 @@ class FileHandle(Handle):
 
 class DirectoryHandle(Handle):
     """Connects a numeric file handle to a Directory object that has
-    been opened by the client."""
+    been opened by the client.
+
+    DirectoryHandle is used by opendir() and readdir() to get
+    directory listings.  Entries returned by readdir() don't increment
+    the lookup count (kernel references), so increment our internal
+    "use count" to avoid having an item being removed mid-read.
+
+    """
 
     def __init__(self, fh, dirobj, entries):
         super(DirectoryHandle, self).__init__(fh, dirobj)
         self.entries = entries
 
+        for ent in self.entries:
+            ent[1].inc_use()
+
+    def release(self):
+        for ent in self.entries:
+            ent[1].dec_use()
+        super(DirectoryHandle, self).release()
+
 
 class InodeCache(object):
     """Records the memory footprint of objects and when they are last used.
 
-    When the cache limit is exceeded, the least recently used objects are
-    cleared.  Clearing the object means discarding its contents to release
-    memory.  The next time the object is accessed, it must be re-fetched from
-    the server.  Note that the inode cache limit is a soft limit; the cache
-    limit may be exceeded if necessary to load very large objects, it may also
-    be exceeded if open file handles prevent objects from being cleared.
+    When the cache limit is exceeded, the least recently used objects
+    are cleared.  Clearing the object means discarding its contents to
+    release memory.  The next time the object is accessed, it must be
+    re-fetched from the server.  Note that the inode cache limit is a
+    soft limit; the cache limit may be exceeded if necessary to load
+    very large projects or collections, it may also be exceeded if an
+    inode can't be safely discarded based on kernel lookups
+    (has_ref()) or internal use count (in_use()).
 
     """
 
     def __init__(self, cap, min_entries=4):
-        self._entries = collections.OrderedDict()
-        self._by_uuid = {}
+        self._cache_entries = collections.OrderedDict()
         self.cap = cap
         self._total = 0
         self.min_entries = min_entries
@@ -154,104 +162,148 @@ class InodeCache(object):
     def total(self):
         return self._total
 
-    def _remove(self, obj, clear):
-        if clear:
-            # Kernel behavior seems to be that if a file is
-            # referenced, its parents remain referenced too. This
-            # means has_ref() exits early when a collection is not
-            # candidate for eviction.
-            #
-            # By contrast, in_use() doesn't increment references on
-            # parents, so it requires a full tree walk to determine if
-            # a collection is a candidate for eviction.  This takes
-            # .07s for 240000 files, which becomes a major drag when
-            # cap_cache is being called several times a second and
-            # there are multiple non-evictable collections in the
-            # cache.
-            #
-            # So it is important for performance that we do the
-            # has_ref() check first.
-
-            if obj.has_ref(True):
-                _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
-                return
+    def evict_candidates(self):
+        """Yield entries that are candidates to be evicted
+        and stop when the cache total has shrunk sufficiently.
 
-            if obj.in_use():
-                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
-                return
+        Implements a LRU cache, when an item is added or touch()ed it
+        goes to the back of the OrderedDict, so items in the front are
+        oldest.  The Inodes._remove() function determines if the entry
+        can actually be removed safely.
 
-            obj.kernel_invalidate()
-            _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
-            obj.clear()
+        """
 
-        # The llfuse lock is released in del_entry(), which is called by
-        # Directory.clear().  While the llfuse lock is released, it can happen
-        # that a reentrant call removes this entry before this call gets to it.
-        # Ensure that the entry is still valid before trying to remove it.
-        if obj.inode not in self._entries:
+        if self._total <= self.cap:
             return
 
-        self._total -= obj.cache_size
-        del self._entries[obj.inode]
-        if obj.cache_uuid:
-            self._by_uuid[obj.cache_uuid].remove(obj)
-            if not self._by_uuid[obj.cache_uuid]:
-                del self._by_uuid[obj.cache_uuid]
-            obj.cache_uuid = None
-        if clear:
-            _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
+        _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
 
-    def cap_cache(self):
-        if self._total > self.cap:
-            for ent in self._entries.values():
-                if self._total < self.cap or len(self._entries) < self.min_entries:
-                    break
-                self._remove(ent, True)
-
-    def manage(self, obj):
-        if obj.persisted():
-            obj.cache_size = obj.objsize()
-            self._entries[obj.inode] = obj
-            obj.cache_uuid = obj.uuid()
-            if obj.cache_uuid:
-                if obj.cache_uuid not in self._by_uuid:
-                    self._by_uuid[obj.cache_uuid] = [obj]
-                else:
-                    if obj not in self._by_uuid[obj.cache_uuid]:
-                        self._by_uuid[obj.cache_uuid].append(obj)
-            self._total += obj.objsize()
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
-                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
-            self.cap_cache()
+        # Copy this into a deque for two reasons:
+        #
+        # 1. _cache_entries is modified by unmanage() which is called
+        # by _remove
+        #
+        # 2. popping off the front means the reference goes away
+        # immediately intead of sticking around for the lifetime of
+        # "values"
+        values = collections.deque(self._cache_entries.values())
 
-    def touch(self, obj):
-        if obj.persisted():
-            if obj.inode in self._entries:
-                self._remove(obj, False)
-            self.manage(obj)
+        while len(values) > 0:
+            if self._total < self.cap or len(self._cache_entries) < self.min_entries:
+                break
+            yield values.popleft()
 
-    def unmanage(self, obj):
-        if obj.persisted() and obj.inode in self._entries:
-            self._remove(obj, True)
+    def unmanage(self, entry):
+        """Stop managing an object in the cache.
 
-    def find_by_uuid(self, uuid):
-        return self._by_uuid.get(uuid, [])
+        This happens when an object is being removed from the inode
+        entries table.
+
+        """
+
+        if entry.inode not in self._cache_entries:
+            return
+
+        # manage cache size running sum
+        self._total -= entry.cache_size
+        entry.cache_size = 0
+
+        # Now forget about it
+        del self._cache_entries[entry.inode]
+
+    def update_cache_size(self, obj):
+        """Update the cache total in response to the footprint of an
+        object changing (usually because it has been loaded or
+        cleared).
+
+        Adds or removes entries to the cache list based on the object
+        cache size.
+
+        """
+
+        if not obj.persisted():
+            return
+
+        if obj.inode in self._cache_entries:
+            self._total -= obj.cache_size
+
+        obj.cache_size = obj.objsize()
+
+        if obj.cache_size > 0 or obj.parent_inode is None:
+            self._total += obj.cache_size
+            self._cache_entries[obj.inode] = obj
+        elif obj.cache_size == 0 and obj.inode in self._cache_entries:
+            del self._cache_entries[obj.inode]
+
+    def touch(self, obj):
+        """Indicate an object was used recently, making it low
+        priority to be removed from the cache.
+
+        """
+        if obj.inode in self._cache_entries:
+            self._cache_entries.move_to_end(obj.inode)
+            return True
+        return False
 
     def clear(self):
-        self._entries.clear()
-        self._by_uuid.clear()
+        self._cache_entries.clear()
         self._total = 0
 
+@dataclass
+class RemoveInode:
+    entry: typing.Union[Directory, File]
+    def inode_op(self, inodes, locked_ops):
+        if locked_ops is None:
+            inodes._remove(self.entry)
+            return True
+        else:
+            locked_ops.append(self)
+            return False
+
+@dataclass
+class InvalidateInode:
+    inode: int
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_inode(self.inode)
+        return True
+
+@dataclass
+class InvalidateEntry:
+    inode: int
+    name: str
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_entry(self.inode, self.name)
+        return True
+
+@dataclass
+class EvictCandidates:
+    def inode_op(self, inodes, locked_ops):
+        return True
+
+
 class Inodes(object):
-    """Manage the set of inodes.  This is the mapping from a numeric id
-    to a concrete File or Directory object"""
+    """Manage the set of inodes.
+
+    This is the mapping from a numeric id to a concrete File or
+    Directory object
+
+    """
 
-    def __init__(self, inode_cache, encoding="utf-8"):
+    def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
         self.encoding = encoding
-        self.deferred_invalidations = []
+        self._fsns = fsns
+        self._shutdown_started = shutdown_started or threading.Event()
+
+        self._inode_remove_queue = queue.Queue()
+        self._inode_remove_thread = threading.Thread(None, self._inode_remove)
+        self._inode_remove_thread.daemon = True
+        self._inode_remove_thread.start()
+
+        self.cap_cache_event = threading.Event()
+        self._by_uuid = collections.defaultdict(list)
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -260,7 +312,7 @@ class Inodes(object):
         self._entries[key] = item
 
     def __iter__(self):
-        return self._entries.iterkeys()
+        return iter(self._entries.keys())
 
     def items(self):
         return self._entries.items()
@@ -269,42 +321,188 @@ class Inodes(object):
         return k in self._entries
 
     def touch(self, entry):
+        """Update the access time, adjust the cache position, and
+        notify the _inode_remove thread to recheck the cache.
+
+        """
+
         entry._atime = time.time()
-        self.inode_cache.touch(entry)
+        if self.inode_cache.touch(entry):
+            self.cap_cache()
+
+    def cap_cache(self):
+        """Notify the _inode_remove thread to recheck the cache."""
+        if not self.cap_cache_event.is_set():
+            self.cap_cache_event.set()
+            self._inode_remove_queue.put(EvictCandidates())
+
+    def update_uuid(self, entry):
+        """Update the Arvados uuid associated with an inode entry.
+
+        This is used to look up inodes that need to be invalidated
+        when a websocket event indicates the object has changed on the
+        API server.
+
+        """
+        if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].remove(entry)
+
+        entry.cache_uuid = entry.uuid()
+        if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].append(entry)
+
+        if not self._by_uuid[entry.cache_uuid]:
+            del self._by_uuid[entry.cache_uuid]
 
     def add_entry(self, entry):
+        """Assign a numeric inode to a new entry."""
+
         entry.inode = next(self._counter)
         if entry.inode == llfuse.ROOT_INODE:
             entry.inc_ref()
         self._entries[entry.inode] = entry
-        self.inode_cache.manage(entry)
+
+        self.update_uuid(entry)
+        self.inode_cache.update_cache_size(entry)
+        self.cap_cache()
         return entry
 
     def del_entry(self, entry):
-        if entry.ref_count == 0:
-            self.inode_cache.unmanage(entry)
-            del self._entries[entry.inode]
+        """Remove entry from the inode table.
+
+        Indicate this inode entry is pending deletion by setting
+        parent_inode to None.  Notify the _inode_remove thread to try
+        and remove it.
+
+        """
+
+        entry.parent_inode = None
+        self._inode_remove_queue.put(RemoveInode(entry))
+        _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+    def _inode_remove(self):
+        """Background thread to handle tasks related to invalidating
+        inodes in the kernel, and removing objects from the inodes
+        table entirely.
+
+        """
+
+        locked_ops = collections.deque()
+        while True:
+            blocking_get = True
+            while True:
+                try:
+                    qentry = self._inode_remove_queue.get(blocking_get)
+                except queue.Empty:
+                    break
+                blocking_get = False
+                if qentry is None:
+                    return
+
+                if self._shutdown_started.is_set():
+                    continue
+
+                # Process this entry
+                if qentry.inode_op(self, locked_ops):
+                    self._inode_remove_queue.task_done()
+
+                # Give up the reference
+                qentry = None
+
+            with llfuse.lock:
+                while len(locked_ops) > 0:
+                    if locked_ops.popleft().inode_op(self, None):
+                        self._inode_remove_queue.task_done()
+                self.cap_cache_event.clear()
+                for entry in self.inode_cache.evict_candidates():
+                    self._remove(entry)
+
+    def wait_remove_queue_empty(self):
+        # used by tests
+        self._inode_remove_queue.join()
+
+    def _remove(self, entry):
+        """Remove an inode entry if possible.
+
+        If the entry is still referenced or in use, don't do anything.
+        If this is not referenced but the parent is still referenced,
+        clear any data held by the object (which may include directory
+        entries under the object) but don't remove it from the inode
+        table.
+
+        """
+        try:
+            if entry.inode is None:
+                # Removed already
+                return
+
+            if entry.inode == llfuse.ROOT_INODE:
+                return
+
+            if entry.in_use():
+                # referenced internally, stay pinned
+                #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+                return
+
+            # Tell the kernel it should forget about it
+            entry.kernel_invalidate()
+
+            if entry.has_ref():
+                # has kernel reference, could still be accessed.
+                # when the kernel forgets about it, we can delete it.
+                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+                return
+
+            # commit any pending changes
             with llfuse.lock_released:
                 entry.finalize()
-            entry.inode = None
-        else:
-            entry.dead = True
-            _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+            # Clear the contents
+            entry.clear()
+
+            if entry.parent_inode is None:
+                _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
+                              entry.inode, entry.cache_size, self.inode_cache.total(),
+                              len(self._entries), type(entry))
+
+                if entry.cache_uuid:
+                    self._by_uuid[entry.cache_uuid].remove(entry)
+                    if not self._by_uuid[entry.cache_uuid]:
+                        del self._by_uuid[entry.cache_uuid]
+                    entry.cache_uuid = None
+
+                self.inode_cache.unmanage(entry)
+
+                del self._entries[entry.inode]
+                entry.inode = None
+
+        except Exception as e:
+            _logger.exception("failed remove")
 
     def invalidate_inode(self, entry):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_inode(entry.inode)
+            self._inode_remove_queue.put(InvalidateInode(entry.inode))
 
     def invalidate_entry(self, entry, name):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_entry(entry.inode, name.encode(self.encoding))
+            self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
+
+    def begin_shutdown(self):
+        self._inode_remove_queue.put(None)
+        if self._inode_remove_thread is not None:
+            self._inode_remove_thread.join()
+        self._inode_remove_thread = None
 
     def clear(self):
+        with llfuse.lock_released:
+            self.begin_shutdown()
+
         self.inode_cache.clear()
+        self._by_uuid.clear()
 
         for k,v in self._entries.items():
             try:
@@ -314,6 +512,14 @@ class Inodes(object):
 
         self._entries.clear()
 
+    def forward_slash_subst(self):
+        return self._fsns
+
+    def find_by_uuid(self, uuid):
+        """Return a list of zero or more inode entries corresponding
+        to this Arvados UUID."""
+        return self._by_uuid.get(uuid, [])
+
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -326,6 +532,8 @@ def catch_exceptions(orig_func):
             raise
         except EnvironmentError as e:
             raise llfuse.FUSEError(e.errno)
+        except NotImplementedError:
+            raise llfuse.FUSEError(errno.ENOTSUP)
         except arvados.errors.KeepWriteError as e:
             _logger.error("Keep write error: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
@@ -351,14 +559,53 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
+    fuse_time = Summary('arvmount_fuse_operations_seconds', 'Time spent during FUSE operations', labelnames=['op'])
+    read_time = fuse_time.labels(op='read')
+    write_time = fuse_time.labels(op='write')
+    destroy_time = fuse_time.labels(op='destroy')
+    on_event_time = fuse_time.labels(op='on_event')
+    getattr_time = fuse_time.labels(op='getattr')
+    setattr_time = fuse_time.labels(op='setattr')
+    lookup_time = fuse_time.labels(op='lookup')
+    forget_time = fuse_time.labels(op='forget')
+    open_time = fuse_time.labels(op='open')
+    release_time = fuse_time.labels(op='release')
+    opendir_time = fuse_time.labels(op='opendir')
+    readdir_time = fuse_time.labels(op='readdir')
+    statfs_time = fuse_time.labels(op='statfs')
+    create_time = fuse_time.labels(op='create')
+    mkdir_time = fuse_time.labels(op='mkdir')
+    unlink_time = fuse_time.labels(op='unlink')
+    rmdir_time = fuse_time.labels(op='rmdir')
+    rename_time = fuse_time.labels(op='rename')
+    flush_time = fuse_time.labels(op='flush')
+
+    def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
         super(Operations, self).__init__()
 
         self._api_client = api_client
 
         if not inode_cache:
             inode_cache = InodeCache(cap=256*1024*1024)
-        self.inodes = Inodes(inode_cache, encoding=encoding)
+
+        if fsns is None:
+            try:
+                fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
+            except KeyError:
+                # old API server with no FSNS config
+                fsns = '_'
+            else:
+                if fsns == '' or fsns == '/':
+                    fsns = None
+
+        # If we get overlapping shutdown events (e.g., fusermount -u
+        # -z and operations.destroy()) llfuse calls forget() on inodes
+        # that have already been deleted. To avoid this, we make
+        # forget() a no-op if called after destroy().
+        self._shutdown_started = threading.Event()
+
+        self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
+                             shutdown_started=self._shutdown_started)
         self.uid = uid
         self.gid = gid
         self.enable_write = enable_write
@@ -371,12 +618,6 @@ class Operations(llfuse.Operations):
         # is fully initialized should wait() on this event object.
         self.initlock = threading.Event()
 
-        # If we get overlapping shutdown events (e.g., fusermount -u
-        # -z and operations.destroy()) llfuse calls forget() on inodes
-        # that have already been deleted. To avoid this, we make
-        # forget() a no-op if called after destroy().
-        self._shutdown_started = threading.Event()
-
         self.num_retries = num_retries
 
         self.read_counter = arvados.keep.Counter()
@@ -385,28 +626,60 @@ class Operations(llfuse.Operations):
         self.write_ops_counter = arvados.keep.Counter()
 
         self.events = None
+l
+        # We rely on the cyclic garbage collector to deallocate
+        # Collection objects from the Python SDK.  A lower GC
+        # threshold encourages Python to be more aggressive in
+        # reclaiming these and seems to slow down the growth in memory
+        # usage over time.
+        gc.set_threshold(200)
 
     def init(self):
         # Allow threads that are waiting for the driver to be finished
         # initializing to continue
         self.initlock.set()
 
+    def metric_samples(self):
+        return self.fuse_time.collect()[0].samples
+
+    def metric_op_names(self):
+        ops = []
+        for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+            if cur_op not in ops:
+                ops.append(cur_op)
+        return ops
+
+    def metric_value(self, opname, metric):
+        op_value = [sample.value for sample in self.metric_samples()
+                    if sample.name == metric and sample.labels['op'] == opname]
+        return op_value[0] if len(op_value) == 1 else None
+
+    def metric_sum_func(self, opname):
+        return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
+
+    def metric_count_func(self, opname):
+        return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+
+    def begin_shutdown(self):
+        self._shutdown_started.set()
+        self.inodes.begin_shutdown()
+
+    @destroy_time.time()
     @catch_exceptions
     def destroy(self):
-        self._shutdown_started.set()
+        _logger.debug("arv-mount destroy: start")
+
+        with llfuse.lock_released:
+            self.begin_shutdown()
+
         if self.events:
             self.events.close()
             self.events = None
 
-        # Different versions of llfuse require and forbid us to
-        # acquire the lock here. See #8345#note-37, #10805#note-9.
-        if LLFUSE_VERSION_0 and llfuse.lock.acquire():
-            # llfuse < 0.42
-            self.inodes.clear()
-            llfuse.lock.release()
-        else:
-            # llfuse >= 0.42
-            self.inodes.clear()
+        self.inodes.clear()
+
+        _logger.debug("arv-mount destroy: complete")
+
 
     def access(self, inode, mode, ctx):
         return True
@@ -417,6 +690,7 @@ class Operations(llfuse.Operations):
             [["event_type", "in", ["create", "update", "delete"]]],
             self.on_event)
 
+    @on_event_time.time()
     @catch_exceptions
     def on_event(self, ev):
         if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
@@ -426,38 +700,34 @@ class Operations(llfuse.Operations):
             old_attrs = properties.get("old_attributes") or {}
             new_attrs = properties.get("new_attributes") or {}
 
-            for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
+            for item in self.inodes.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if ev.get("object_kind") == "arvados#collection":
-                    pdh = new_attrs.get("portable_data_hash")
-                    # new_attributes.modified_at currently lacks
-                    # subsecond precision (see #6347) so use event_at
-                    # which should always be the same.
-                    stamp = ev.get("event_at")
-                    if (stamp and pdh and item.writable() and
-                        item.collection is not None and
-                        item.collection.modified() and
-                        new_attrs.get("is_trashed") is not True):
-                        item.update(to_record_version=(stamp, pdh))
 
             oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
-                    self.inodes.inode_cache.find_by_uuid(oldowner) +
-                    self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.child_event(ev)
+                    self.inodes.find_by_uuid(oldowner) +
+                    self.inodes.find_by_uuid(newowner)):
+                parent.invalidate()
 
+    @getattr_time.time()
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
+            _logger.debug("arv-mount getattr: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         e = self.inodes[inode]
+        self.inodes.touch(e)
+        parent = None
+        if e.parent_inode:
+            parent = self.inodes[e.parent_inode]
+            self.inodes.touch(parent)
 
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 0
+        entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
@@ -479,7 +749,7 @@ class Operations(llfuse.Operations):
         entry.st_size = e.size()
 
         entry.st_blksize = 512
-        entry.st_blocks = (entry.st_size/512)+1
+        entry.st_blocks = (entry.st_size // 512) + 1
         if hasattr(entry, 'st_atime_ns'):
             # llfuse >= 0.42
             entry.st_atime_ns = int(e.atime() * 1000000000)
@@ -493,6 +763,7 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @setattr_time.time()
     @catch_exceptions
     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
         entry = self.getattr(inode)
@@ -516,25 +787,31 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @lookup_time.time()
     @catch_exceptions
     def lookup(self, parent_inode, name, ctx=None):
-        name = unicode(name, self.inodes.encoding)
+        name = str(name, self.inodes.encoding)
         inode = None
 
         if name == '.':
             inode = parent_inode
-        else:
-            if parent_inode in self.inodes:
-                p = self.inodes[parent_inode]
-                self.inodes.touch(p)
-                if name == '..':
-                    inode = p.parent_inode
-                elif isinstance(p, Directory) and name in p:
-                    inode = p[name].inode
+        elif parent_inode in self.inodes:
+            p = self.inodes[parent_inode]
+            self.inodes.touch(p)
+            if name == '..':
+                inode = p.parent_inode
+            elif isinstance(p, Directory) and name in p:
+                if p[name].inode is None:
+                    _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
+                                  parent_inode, name)
+                    raise llfuse.FUSEError(errno.ENOENT)
+
+                inode = p[name].inode
 
         if inode != None:
             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
                       parent_inode, name, inode)
+            self.inodes.touch(self.inodes[inode])
             self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
@@ -542,6 +819,7 @@ class Operations(llfuse.Operations):
                       parent_inode, name)
             raise llfuse.FUSEError(errno.ENOENT)
 
+    @forget_time.time()
     @catch_exceptions
     def forget(self, inodes):
         if self._shutdown_started.is_set():
@@ -549,14 +827,16 @@ class Operations(llfuse.Operations):
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
-            if ent.dec_ref(nlookup) == 0 and ent.dead:
+            if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
                 self.inodes.del_entry(ent)
 
+    @open_time.time()
     @catch_exceptions
     def open(self, inode, flags, ctx=None):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.debug("arv-mount open: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if isinstance(p, Directory):
@@ -587,6 +867,7 @@ class Operations(llfuse.Operations):
 
         return fh
 
+    @read_time.time()
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
@@ -604,6 +885,7 @@ class Operations(llfuse.Operations):
             self.read_counter.add(len(r))
         return r
 
+    @write_time.time()
     @catch_exceptions
     def write(self, fh, off, buf):
         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
@@ -624,6 +906,7 @@ class Operations(llfuse.Operations):
             self.write_counter.add(w)
         return w
 
+    @release_time.time()
     @catch_exceptions
     def release(self, fh):
         if fh in self._filehandles:
@@ -635,11 +918,12 @@ class Operations(llfuse.Operations):
             finally:
                 self._filehandles[fh].release()
                 del self._filehandles[fh]
-        self.inodes.inode_cache.cap_cache()
+        self.inodes.cap_cache()
 
     def releasedir(self, fh):
         self.release(fh)
 
+    @opendir_time.time()
     @catch_exceptions
     def opendir(self, inode, ctx=None):
         _logger.debug("arv-mount opendir: inode %i", inode)
@@ -647,6 +931,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if not isinstance(p, Directory):
@@ -656,14 +941,19 @@ class Operations(llfuse.Operations):
         if p.parent_inode in self.inodes:
             parent = self.inodes[p.parent_inode]
         else:
+            _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
             raise llfuse.FUSEError(errno.EIO)
 
+        _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
+
         # update atime
+        p.inc_use()
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
+        p.dec_use()
         self.inodes.touch(p)
-
-        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
         return fh
 
+    @readdir_time.time()
     @catch_exceptions
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -675,10 +965,12 @@ class Operations(llfuse.Operations):
 
         e = off
         while e < len(handle.entries):
-            if handle.entries[e][1].inode in self.inodes:
-                yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
+            ent = handle.entries[e]
+            if ent[1].inode in self.inodes:
+                yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
             e += 1
 
+    @statfs_time.time()
     @catch_exceptions
     def statfs(self, ctx=None):
         st = llfuse.StatvfsData()
@@ -712,8 +1004,10 @@ class Operations(llfuse.Operations):
 
         return p
 
+    @create_time.time()
     @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -728,8 +1022,10 @@ class Operations(llfuse.Operations):
         f.inc_ref()
         return (fh, self.getattr(f.inode))
 
+    @mkdir_time.time()
     @catch_exceptions
     def mkdir(self, inode_parent, name, mode, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -741,25 +1037,33 @@ class Operations(llfuse.Operations):
         d.inc_ref()
         return self.getattr(d.inode)
 
+    @unlink_time.time()
     @catch_exceptions
     def unlink(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
 
+    @rmdir_time.time()
     @catch_exceptions
     def rmdir(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
 
+    @rename_time.time()
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
+        name_old = name_old.decode(encoding=self.inodes.encoding)
+        name_new = name_new.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
         dest = self._check_writable(inode_parent_new)
         dest.rename(name_old, name_new, src)
 
+    @flush_time.time()
     @catch_exceptions
     def flush(self, fh):
         if fh in self._filehandles: