"""
-from __future__ import absolute_import
-from __future__ import division
-from future.utils import viewitems
-from future.utils import native
-from future.utils import listvalues
-from future.utils import listitems
-from future import standard_library
-standard_library.install_aliases()
-from builtins import next
-from builtins import str
-from builtins import object
import os
import llfuse
import errno
import arvados.keep
from prometheus_client import Summary
import queue
+from dataclasses import dataclass
+import typing
from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from .fusefile import StringFile, FuseArvadosFile
+from .fusefile import File, StringFile, FuseArvadosFile
_logger = logging.getLogger('arvados.arvados_fuse')
"""
def __init__(self, cap, min_entries=4):
+ # Standard dictionaries are ordered, but OrderedDict is still better here, see
+ # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
+ # specifically we use move_to_end() which standard dicts don't have.
self._cache_entries = collections.OrderedDict()
- self._by_uuid = {}
self.cap = cap
self._total = 0
self.min_entries = min_entries
return
_logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
- for ent in listvalues(self._cache_entries):
+
+ # Copy this into a deque for two reasons:
+ #
+ # 1. _cache_entries is modified by unmanage() which is called
+ # by _remove
+ #
+ # 2. popping off the front means the reference goes away
+ # immediately intead of sticking around for the lifetime of
+ # "values"
+ values = collections.deque(self._cache_entries.values())
+
+ while values:
if self._total < self.cap or len(self._cache_entries) < self.min_entries:
break
- if ent.cache_size > 0 or ent.dead:
- # if cache_size is zero it's been cleared already
- yield ent
-
- def manage(self, obj):
- """Add a new object to be cache managed.
-
- This means evict_candidates will suggest clearing and removing
- the inode when there is memory pressure.
-
- """
-
- if obj.inode in self._cache_entries:
- return
-
- obj.cache_size = obj.objsize()
- self._total += obj.cache_size
-
- self._cache_entries[obj.inode] = obj
-
- obj.cache_uuid = obj.uuid()
- if obj.cache_uuid:
- if obj.cache_uuid not in self._by_uuid:
- self._by_uuid[obj.cache_uuid] = [obj]
- else:
- if obj not in self._by_uuid[obj.cache_uuid]:
- self._by_uuid[obj.cache_uuid].append(obj)
-
- _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
- obj.inode, obj.cache_size, obj.cache_uuid, self._total, len(self._cache_entries))
+ yield values.popleft()
def unmanage(self, entry):
"""Stop managing an object in the cache.
self._total -= entry.cache_size
entry.cache_size = 0
- # manage the mapping of uuid to object
- if entry.cache_uuid:
- self._by_uuid[entry.cache_uuid].remove(entry)
- if not self._by_uuid[entry.cache_uuid]:
- del self._by_uuid[entry.cache_uuid]
- entry.cache_uuid = None
-
# Now forget about it
del self._cache_entries[entry.inode]
object changing (usually because it has been loaded or
cleared).
+ Adds or removes entries to the cache list based on the object
+ cache size.
+
"""
+
+ if not obj.persisted():
+ return
+
if obj.inode in self._cache_entries:
self._total -= obj.cache_size
- obj.cache_size = obj.objsize()
+
+ obj.cache_size = obj.objsize()
+
+ if obj.cache_size > 0 or obj.parent_inode is None:
self._total += obj.cache_size
+ self._cache_entries[obj.inode] = obj
+ elif obj.cache_size == 0 and obj.inode in self._cache_entries:
+ del self._cache_entries[obj.inode]
def touch(self, obj):
"""Indicate an object was used recently, making it low
"""
if obj.inode in self._cache_entries:
self._cache_entries.move_to_end(obj.inode)
- else:
- self.manage(obj)
-
- def find_by_uuid(self, uuid):
- return self._by_uuid.get(uuid, [])
+ return True
+ return False
def clear(self):
self._cache_entries.clear()
- self._by_uuid.clear()
self._total = 0
+@dataclass
+class RemoveInode:
+ entry: typing.Union[Directory, File]
+ def inode_op(self, inodes, locked_ops):
+ if locked_ops is None:
+ inodes._remove(self.entry)
+ return True
+ else:
+ locked_ops.append(self)
+ return False
+
+@dataclass
+class InvalidateInode:
+ inode: int
+ def inode_op(self, inodes, locked_ops):
+ llfuse.invalidate_inode(self.inode)
+ return True
+
+@dataclass
+class InvalidateEntry:
+ inode: int
+ name: str
+ def inode_op(self, inodes, locked_ops):
+ llfuse.invalidate_entry(self.inode, self.name)
+ return True
+
+@dataclass
+class EvictCandidates:
+ def inode_op(self, inodes, locked_ops):
+ return True
+
class Inodes(object):
"""Manage the set of inodes.
self._inode_remove_thread.daemon = True
self._inode_remove_thread.start()
+ self.cap_cache_event = threading.Event()
+ self._by_uuid = collections.defaultdict(list)
+
def __getitem__(self, item):
return self._entries[item]
return iter(self._entries.keys())
def items(self):
- return viewitems(self._entries.items())
+ return self._entries.items()
def __contains__(self, k):
return k in self._entries
"""
entry._atime = time.time()
- self.inode_cache.touch(entry)
- self.cap_cache()
+ if self.inode_cache.touch(entry):
+ self.cap_cache()
def cap_cache(self):
"""Notify the _inode_remove thread to recheck the cache."""
- self._inode_remove_queue.put(("evict_candidates",))
+ if not self.cap_cache_event.is_set():
+ self.cap_cache_event.set()
+ self._inode_remove_queue.put(EvictCandidates())
+
+ def update_uuid(self, entry):
+ """Update the Arvados uuid associated with an inode entry.
+
+ This is used to look up inodes that need to be invalidated
+ when a websocket event indicates the object has changed on the
+ API server.
+
+ """
+ if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+
+ entry.cache_uuid = entry.uuid()
+ if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
+ self._by_uuid[entry.cache_uuid].append(entry)
+
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
def add_entry(self, entry):
"""Assign a numeric inode to a new entry."""
if entry.inode == llfuse.ROOT_INODE:
entry.inc_ref()
self._entries[entry.inode] = entry
- if entry.persisted():
- # only "persisted" items can be reloaded from the server
- # making them safe to evict automatically.
- self.inode_cache.manage(entry)
+
+ self.update_uuid(entry)
+ self.inode_cache.update_cache_size(entry)
self.cap_cache()
return entry
def del_entry(self, entry):
"""Remove entry from the inode table.
- Put a tombstone marker on it and notify the _inode_remove
- thread to try and remove it.
+ Indicate this inode entry is pending deletion by setting
+ parent_inode to None. Notify the _inode_remove thread to try
+ and remove it.
"""
- entry.dead = True
- self._inode_remove_queue.put(("remove", entry))
+ entry.parent_inode = None
+ self._inode_remove_queue.put(RemoveInode(entry))
_logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
def _inode_remove(self):
locked_ops = collections.deque()
while True:
- try:
- entry = self._inode_remove_queue.get(True)
- if entry is None:
+ blocking_get = True
+ while True:
+ try:
+ qentry = self._inode_remove_queue.get(blocking_get)
+ except queue.Empty:
+ break
+ blocking_get = False
+ if qentry is None:
return
+
+ if self._shutdown_started.is_set():
+ continue
+
# Process this entry
- _logger.debug("_inode_remove %s", entry)
- if self._inode_op(entry, locked_ops):
+ if qentry.inode_op(self, locked_ops):
self._inode_remove_queue.task_done()
- # Drain the queue of any other entries
- while True:
- try:
- entry = self._inode_remove_queue.get(False)
- if entry is None:
- return
- _logger.debug("_inode_remove %s", entry)
- if self._inode_op(entry, locked_ops):
- self._inode_remove_queue.task_done()
- except queue.Empty:
- break
-
- with llfuse.lock:
- while len(locked_ops) > 0:
- if self._inode_op(locked_ops.popleft(), None):
- self._inode_remove_queue.task_done()
- for entry in self.inode_cache.evict_candidates():
- self._remove(entry)
- except Exception as e:
- _logger.exception("_inode_remove")
+ # Give up the reference
+ qentry = None
- def _inode_op(self, op, locked_ops):
- """Process an inode operation: attempt to remove an inode
- entry, tell the kernel to invalidate a inode metadata or
- directory entry, or trigger a cache check.
-
- """
- if self._shutdown_started.is_set():
- return True
- if op[0] == "remove":
- if locked_ops is None:
- self._remove(op[1])
- return True
- else:
- locked_ops.append(op)
- return False
- if op[0] == "invalidate_inode":
- _logger.debug("sending invalidate inode %i", op[1])
- llfuse.invalidate_inode(op[1])
- return True
- if op[0] == "invalidate_entry":
- _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
- llfuse.invalidate_entry(op[1], op[2])
- return True
- if op[0] == "evict_candidates":
- return True
+ with llfuse.lock:
+ while locked_ops:
+ if locked_ops.popleft().inode_op(self, None):
+ self._inode_remove_queue.task_done()
+ self.cap_cache_event.clear()
+ for entry in self.inode_cache.evict_candidates():
+ self._remove(entry)
def wait_remove_queue_empty(self):
# used by tests
# Removed already
return
- # Tell the kernel it should forget about it.
- entry.kernel_invalidate()
-
- if entry.has_ref():
- # has kernel reference, could still be accessed.
- # when the kernel forgets about it, we can delete it.
- #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+ if entry.inode == llfuse.ROOT_INODE:
return
if entry.in_use():
#_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
return
- forget_inode = True
- parent = self._entries.get(entry.parent_inode)
- if (parent is not None and parent.has_ref()) or entry.inode == llfuse.ROOT_INODE:
- # the parent is still referenced, so we'll keep the
- # entry but wipe out the stuff under it
- forget_inode = False
+ # Tell the kernel it should forget about it
+ entry.kernel_invalidate()
- if entry.cache_size == 0 and not forget_inode:
- # Was cleared already
+ if entry.has_ref():
+ # has kernel reference, could still be accessed.
+ # when the kernel forgets about it, we can delete it.
+ #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
return
- if forget_inode:
- self.inode_cache.unmanage(entry)
-
- _logger.debug("InodeCache removing inode %i", entry.inode)
+ # commit any pending changes
+ with llfuse.lock_released:
+ entry.finalize()
- # For directories, clear the contents
+ # Clear the contents
entry.clear()
- _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s, inode entries %i, type %s",
- entry.inode, self.inode_cache.total(), forget_inode,
- len(self._entries), type(entry))
- if forget_inode:
+ if entry.parent_inode is None:
+ _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
+ entry.inode, entry.cache_size, self.inode_cache.total(),
+ len(self._entries), type(entry))
+
+ if entry.cache_uuid:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
+ entry.cache_uuid = None
+
+ self.inode_cache.unmanage(entry)
+
del self._entries[entry.inode]
entry.inode = None
- # stop anything else
- with llfuse.lock_released:
- entry.finalize()
except Exception as e:
_logger.exception("failed remove")
if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- self._inode_remove_queue.put(("invalidate_inode", entry.inode))
+ self._inode_remove_queue.put(InvalidateInode(entry.inode))
def invalidate_entry(self, entry, name):
if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
+ self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
def begin_shutdown(self):
self._inode_remove_queue.put(None)
self.begin_shutdown()
self.inode_cache.clear()
+ self._by_uuid.clear()
- for k,v in viewitems(self._entries):
+ for k,v in self._entries.items():
try:
v.finalize()
except Exception as e:
def forward_slash_subst(self):
return self._fsns
+ def find_by_uuid(self, uuid):
+ """Return a list of zero or more inode entries corresponding
+ to this Arvados UUID."""
+ return self._by_uuid.get(uuid, [])
+
def catch_exceptions(orig_func):
"""Catch uncaught exceptions and log them consistently."""
def destroy(self):
_logger.debug("arv-mount destroy: start")
- self.begin_shutdown()
+ with llfuse.lock_released:
+ self.begin_shutdown()
if self.events:
self.events.close()
old_attrs = properties.get("old_attributes") or {}
new_attrs = properties.get("new_attributes") or {}
- for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
+ for item in self.inodes.find_by_uuid(ev["object_uuid"]):
item.invalidate()
oldowner = old_attrs.get("owner_uuid")
newowner = ev.get("object_owner_uuid")
for parent in (
- self.inodes.inode_cache.find_by_uuid(oldowner) +
- self.inodes.inode_cache.find_by_uuid(newowner)):
+ self.inodes.find_by_uuid(oldowner) +
+ self.inodes.find_by_uuid(newowner)):
parent.invalidate()
@getattr_time.time()
raise llfuse.FUSEError(errno.ENOENT)
e = self.inodes[inode]
+ self.inodes.touch(e)
+ parent = None
+ if e.parent_inode:
+ parent = self.inodes[e.parent_inode]
+ self.inodes.touch(parent)
entry = llfuse.EntryAttributes()
entry.st_ino = inode
entry.generation = 0
- entry.entry_timeout = 0
+ entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
if name == '..':
inode = p.parent_inode
elif isinstance(p, Directory) and name in p:
+ if p[name].inode is None:
+ _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
+ parent_inode, name)
+ raise llfuse.FUSEError(errno.ENOENT)
+
inode = p[name].inode
if inode != None:
_logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
parent_inode, name, inode)
+ self.inodes.touch(self.inodes[inode])
self.inodes[inode].inc_ref()
return self.getattr(inode)
else:
for inode, nlookup in inodes:
ent = self.inodes[inode]
_logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
- if ent.dec_ref(nlookup) == 0 and ent.dead:
+ if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
self.inodes.del_entry(ent)
@open_time.time()
_logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
# update atime
- self.inodes.touch(p)
p.inc_use()
- self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+ self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
p.dec_use()
+ self.inodes.touch(p)
return fh
@readdir_time.time()