X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b9fde93b6e24b0575ce81a964be6884231647ee4..HEAD:/services/fuse/arvados_fuse/__init__.py diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index 08a44c9533..c39afa4757 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -54,17 +54,6 @@ inode assigned to it and appears in the Inodes._entries dictionary. """ -from __future__ import absolute_import -from __future__ import division -from future.utils import viewitems -from future.utils import native -from future.utils import listvalues -from future.utils import listitems -from future import standard_library -standard_library.install_aliases() -from builtins import next -from builtins import str -from builtins import object import os import llfuse import errno @@ -81,9 +70,11 @@ import functools import arvados.keep from prometheus_client import Summary import queue +from dataclasses import dataclass +import typing from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase -from .fusefile import StringFile, FuseArvadosFile +from .fusefile import File, StringFile, FuseArvadosFile _logger = logging.getLogger('arvados.arvados_fuse') @@ -157,8 +148,10 @@ class InodeCache(object): """ def __init__(self, cap, min_entries=4): + # Standard dictionaries are ordered, but OrderedDict is still better here, see + # https://docs.python.org/3.11/library/collections.html#ordereddict-objects + # specifically we use move_to_end() which standard dicts don't have. self._cache_entries = collections.OrderedDict() - self._by_uuid = {} self.cap = cap self._total = 0 self.min_entries = min_entries @@ -181,39 +174,21 @@ class InodeCache(object): return _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries)) - for ent in listvalues(self._cache_entries): + + # Copy this into a deque for two reasons: + # + # 1. _cache_entries is modified by unmanage() which is called + # by _remove + # + # 2. popping off the front means the reference goes away + # immediately intead of sticking around for the lifetime of + # "values" + values = collections.deque(self._cache_entries.values()) + + while values: if self._total < self.cap or len(self._cache_entries) < self.min_entries: break - if ent.cache_size > 0 or ent.dead: - # if cache_size is zero it's been cleared already - yield ent - - def manage(self, obj): - """Add a new object to be cache managed. - - This means evict_candidates will suggest clearing and removing - the inode when there is memory pressure. - - """ - - if obj.inode in self._cache_entries: - return - - obj.cache_size = obj.objsize() - self._total += obj.cache_size - - self._cache_entries[obj.inode] = obj - - obj.cache_uuid = obj.uuid() - if obj.cache_uuid: - if obj.cache_uuid not in self._by_uuid: - self._by_uuid[obj.cache_uuid] = [obj] - else: - if obj not in self._by_uuid[obj.cache_uuid]: - self._by_uuid[obj.cache_uuid].append(obj) - - _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)", - obj.inode, obj.cache_size, obj.cache_uuid, self._total, len(self._cache_entries)) + yield values.popleft() def unmanage(self, entry): """Stop managing an object in the cache. @@ -230,13 +205,6 @@ class InodeCache(object): self._total -= entry.cache_size entry.cache_size = 0 - # manage the mapping of uuid to object - if entry.cache_uuid: - self._by_uuid[entry.cache_uuid].remove(entry) - if not self._by_uuid[entry.cache_uuid]: - del self._by_uuid[entry.cache_uuid] - entry.cache_uuid = None - # Now forget about it del self._cache_entries[entry.inode] @@ -245,11 +213,24 @@ class InodeCache(object): object changing (usually because it has been loaded or cleared). + Adds or removes entries to the cache list based on the object + cache size. + """ + + if not obj.persisted(): + return + if obj.inode in self._cache_entries: self._total -= obj.cache_size - obj.cache_size = obj.objsize() + + obj.cache_size = obj.objsize() + + if obj.cache_size > 0 or obj.parent_inode is None: self._total += obj.cache_size + self._cache_entries[obj.inode] = obj + elif obj.cache_size == 0 and obj.inode in self._cache_entries: + del self._cache_entries[obj.inode] def touch(self, obj): """Indicate an object was used recently, making it low @@ -258,17 +239,44 @@ class InodeCache(object): """ if obj.inode in self._cache_entries: self._cache_entries.move_to_end(obj.inode) - else: - self.manage(obj) - - def find_by_uuid(self, uuid): - return self._by_uuid.get(uuid, []) + return True + return False def clear(self): self._cache_entries.clear() - self._by_uuid.clear() self._total = 0 +@dataclass +class RemoveInode: + entry: typing.Union[Directory, File] + def inode_op(self, inodes, locked_ops): + if locked_ops is None: + inodes._remove(self.entry) + return True + else: + locked_ops.append(self) + return False + +@dataclass +class InvalidateInode: + inode: int + def inode_op(self, inodes, locked_ops): + llfuse.invalidate_inode(self.inode) + return True + +@dataclass +class InvalidateEntry: + inode: int + name: str + def inode_op(self, inodes, locked_ops): + llfuse.invalidate_entry(self.inode, self.name) + return True + +@dataclass +class EvictCandidates: + def inode_op(self, inodes, locked_ops): + return True + class Inodes(object): """Manage the set of inodes. @@ -291,6 +299,8 @@ class Inodes(object): self._inode_remove_thread.daemon = True self._inode_remove_thread.start() + self._by_uuid = collections.defaultdict(list) + def __getitem__(self, item): return self._entries[item] @@ -301,7 +311,7 @@ class Inodes(object): return iter(self._entries.keys()) def items(self): - return viewitems(self._entries.items()) + return self._entries.items() def __contains__(self, k): return k in self._entries @@ -313,12 +323,31 @@ class Inodes(object): """ entry._atime = time.time() - self.inode_cache.touch(entry) - self.cap_cache() + if self.inode_cache.touch(entry): + self.cap_cache() def cap_cache(self): """Notify the _inode_remove thread to recheck the cache.""" - self._inode_remove_queue.put(("evict_candidates",)) + if self._inode_remove_queue.empty(): + self._inode_remove_queue.put(EvictCandidates()) + + def update_uuid(self, entry): + """Update the Arvados uuid associated with an inode entry. + + This is used to look up inodes that need to be invalidated + when a websocket event indicates the object has changed on the + API server. + + """ + if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]: + self._by_uuid[entry.cache_uuid].remove(entry) + + entry.cache_uuid = entry.uuid() + if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]: + self._by_uuid[entry.cache_uuid].append(entry) + + if not self._by_uuid[entry.cache_uuid]: + del self._by_uuid[entry.cache_uuid] def add_entry(self, entry): """Assign a numeric inode to a new entry.""" @@ -327,23 +356,23 @@ class Inodes(object): if entry.inode == llfuse.ROOT_INODE: entry.inc_ref() self._entries[entry.inode] = entry - if entry.persisted(): - # only "persisted" items can be reloaded from the server - # making them safe to evict automatically. - self.inode_cache.manage(entry) + + self.update_uuid(entry) + self.inode_cache.update_cache_size(entry) self.cap_cache() return entry def del_entry(self, entry): """Remove entry from the inode table. - Put a tombstone marker on it and notify the _inode_remove - thread to try and remove it. + Indicate this inode entry is pending deletion by setting + parent_inode to None. Notify the _inode_remove thread to try + and remove it. """ - entry.dead = True - self._inode_remove_queue.put(("remove", entry)) + entry.parent_inode = None + self._inode_remove_queue.put(RemoveInode(entry)) _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count) def _inode_remove(self): @@ -354,62 +383,39 @@ class Inodes(object): """ locked_ops = collections.deque() - while True: - try: - entry = self._inode_remove_queue.get(True) - if entry is None: - return - # Process this entry - _logger.debug("_inode_remove %s", entry) - if self._inode_op(entry, locked_ops): - self._inode_remove_queue.task_done() - - # Drain the queue of any other entries - while True: - try: - entry = self._inode_remove_queue.get(False) - if entry is None: - return - _logger.debug("_inode_remove %s", entry) - if self._inode_op(entry, locked_ops): - self._inode_remove_queue.task_done() - except queue.Empty: - break - - with llfuse.lock: - while len(locked_ops) > 0: - if self._inode_op(locked_ops.popleft(), None): - self._inode_remove_queue.task_done() - for entry in self.inode_cache.evict_candidates(): - self._remove(entry) - except Exception as e: - _logger.exception("_inode_remove") - - def _inode_op(self, op, locked_ops): - """Process an inode operation: attempt to remove an inode - entry, tell the kernel to invalidate a inode metadata or - directory entry, or trigger a cache check. - - """ - if self._shutdown_started.is_set(): - return True - if op[0] == "remove": - if locked_ops is None: - self._remove(op[1]) - return True - else: - locked_ops.append(op) - return False - if op[0] == "invalidate_inode": - _logger.debug("sending invalidate inode %i", op[1]) - llfuse.invalidate_inode(op[1]) - return True - if op[0] == "invalidate_entry": - _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2]) - llfuse.invalidate_entry(op[1], op[2]) - return True - if op[0] == "evict_candidates": - return True + shutting_down = False + while not shutting_down: + tasks_done = 0 + blocking_get = True + while True: + try: + qentry = self._inode_remove_queue.get(blocking_get) + except queue.Empty: + break + + blocking_get = False + if qentry is None: + shutting_down = True + continue + + # Process (or defer) this entry + qentry.inode_op(self, locked_ops) + tasks_done += 1 + + # Give up the reference + qentry = None + + with llfuse.lock: + while locked_ops: + locked_ops.popleft().inode_op(self, None) + for entry in self.inode_cache.evict_candidates(): + self._remove(entry) + + # Unblock _inode_remove_queue.join() only when all of the + # deferred work is done, i.e., after calling inode_op() + # and then evict_candidates(). + for _ in range(tasks_done): + self._inode_remove_queue.task_done() def wait_remove_queue_empty(self): # used by tests @@ -430,13 +436,7 @@ class Inodes(object): # Removed already return - # Tell the kernel it should forget about it. - entry.kernel_invalidate() - - if entry.has_ref(): - # has kernel reference, could still be accessed. - # when the kernel forgets about it, we can delete it. - #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode) + if entry.inode == llfuse.ROOT_INODE: return if entry.in_use(): @@ -444,35 +444,38 @@ class Inodes(object): #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode) return - forget_inode = True - parent = self._entries.get(entry.parent_inode) - if (parent is not None and parent.has_ref()) or entry.inode == llfuse.ROOT_INODE: - # the parent is still referenced, so we'll keep the - # entry but wipe out the stuff under it - forget_inode = False + # Tell the kernel it should forget about it + entry.kernel_invalidate() - if entry.cache_size == 0 and not forget_inode: - # Was cleared already + if entry.has_ref(): + # has kernel reference, could still be accessed. + # when the kernel forgets about it, we can delete it. + #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode) return - if forget_inode: - self.inode_cache.unmanage(entry) - - _logger.debug("InodeCache removing inode %i", entry.inode) + # commit any pending changes + with llfuse.lock_released: + entry.finalize() - # For directories, clear the contents + # Clear the contents entry.clear() - _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s, inode entries %i, type %s", - entry.inode, self.inode_cache.total(), forget_inode, - len(self._entries), type(entry)) - if forget_inode: + if entry.parent_inode is None: + _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s", + entry.inode, entry.cache_size, self.inode_cache.total(), + len(self._entries), type(entry)) + + if entry.cache_uuid: + self._by_uuid[entry.cache_uuid].remove(entry) + if not self._by_uuid[entry.cache_uuid]: + del self._by_uuid[entry.cache_uuid] + entry.cache_uuid = None + + self.inode_cache.unmanage(entry) + del self._entries[entry.inode] entry.inode = None - # stop anything else - with llfuse.lock_released: - entry.finalize() except Exception as e: _logger.exception("failed remove") @@ -480,13 +483,13 @@ class Inodes(object): if entry.has_ref(): # Only necessary if the kernel has previously done a lookup on this # inode and hasn't yet forgotten about it. - self._inode_remove_queue.put(("invalidate_inode", entry.inode)) + self._inode_remove_queue.put(InvalidateInode(entry.inode)) def invalidate_entry(self, entry, name): if entry.has_ref(): # Only necessary if the kernel has previously done a lookup on this # inode and hasn't yet forgotten about it. - self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding)))) + self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding))) def begin_shutdown(self): self._inode_remove_queue.put(None) @@ -499,8 +502,9 @@ class Inodes(object): self.begin_shutdown() self.inode_cache.clear() + self._by_uuid.clear() - for k,v in viewitems(self._entries): + for k,v in self._entries.items(): try: v.finalize() except Exception as e: @@ -511,6 +515,11 @@ class Inodes(object): def forward_slash_subst(self): return self._fsns + def find_by_uuid(self, uuid): + """Return a list of zero or more inode entries corresponding + to this Arvados UUID.""" + return self._by_uuid.get(uuid, []) + def catch_exceptions(orig_func): """Catch uncaught exceptions and log them consistently.""" @@ -653,7 +662,8 @@ class Operations(llfuse.Operations): def destroy(self): _logger.debug("arv-mount destroy: start") - self.begin_shutdown() + with llfuse.lock_released: + self.begin_shutdown() if self.events: self.events.close() @@ -683,14 +693,14 @@ class Operations(llfuse.Operations): old_attrs = properties.get("old_attributes") or {} new_attrs = properties.get("new_attributes") or {} - for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]): + for item in self.inodes.find_by_uuid(ev["object_uuid"]): item.invalidate() oldowner = old_attrs.get("owner_uuid") newowner = ev.get("object_owner_uuid") for parent in ( - self.inodes.inode_cache.find_by_uuid(oldowner) + - self.inodes.inode_cache.find_by_uuid(newowner)): + self.inodes.find_by_uuid(oldowner) + + self.inodes.find_by_uuid(newowner)): parent.invalidate() @getattr_time.time() @@ -701,11 +711,16 @@ class Operations(llfuse.Operations): raise llfuse.FUSEError(errno.ENOENT) e = self.inodes[inode] + self.inodes.touch(e) + parent = None + if e.parent_inode: + parent = self.inodes[e.parent_inode] + self.inodes.touch(parent) entry = llfuse.EntryAttributes() entry.st_ino = inode entry.generation = 0 - entry.entry_timeout = 0 + entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0 entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH @@ -779,11 +794,17 @@ class Operations(llfuse.Operations): if name == '..': inode = p.parent_inode elif isinstance(p, Directory) and name in p: + if p[name].inode is None: + _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None", + parent_inode, name) + raise llfuse.FUSEError(errno.ENOENT) + inode = p[name].inode if inode != None: _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i", parent_inode, name, inode) + self.inodes.touch(self.inodes[inode]) self.inodes[inode].inc_ref() return self.getattr(inode) else: @@ -799,7 +820,7 @@ class Operations(llfuse.Operations): for inode, nlookup in inodes: ent = self.inodes[inode] _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count) - if ent.dec_ref(nlookup) == 0 and ent.dead: + if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None: self.inodes.del_entry(ent) @open_time.time() @@ -919,10 +940,10 @@ class Operations(llfuse.Operations): _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh) # update atime - self.inodes.touch(p) p.inc_use() - self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p)) + self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items()) p.dec_use() + self.inodes.touch(p) return fh @readdir_time.time()