The FUSE driver supports the Arvados event bus. When an event is received for
an object that is live in the inode cache, that object is immediately updated.
+Implementation note: in the code, the terms 'object', 'entry' and
+'inode' are used somewhat interchangeably, but generally mean an
+arvados_fuse.File or arvados_fuse.Directory object which has numeric
+inode assigned to it and appears in the Inodes._entries dictionary.
+
"""
from __future__ import absolute_import
from prometheus_client import Summary
import queue
-# Default _notify_queue has a limit of 1000 items, but it really needs to be
-# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
-# details.
-
-if hasattr(llfuse, 'capi'):
- # llfuse < 0.42
- llfuse.capi._notify_queue = queue.Queue()
-else:
- # llfuse >= 0.42
- llfuse._notify_queue = queue.Queue()
-
-LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
-
from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
from .fusefile import StringFile, FuseArvadosFile
class DirectoryHandle(Handle):
"""Connects a numeric file handle to a Directory object that has
- been opened by the client."""
+ been opened by the client.
+
+ DirectoryHandle is used by opendir() and readdir() to get
+ directory listings. Entries returned by readdir() don't increment
+ the lookup count (kernel references), so increment our internal
+ "use count" to avoid having an item being removed mid-read.
+
+ """
def __init__(self, fh, dirobj, entries):
super(DirectoryHandle, self).__init__(fh, dirobj)
self.entries = entries
+ for ent in self.entries:
+ ent[1].inc_use()
+
+ def release(self):
+ for ent in self.entries:
+ ent[1].dec_use()
+ super(DirectoryHandle, self).release()
+
class InodeCache(object):
"""Records the memory footprint of objects and when they are last used.
- When the cache limit is exceeded, the least recently used objects are
- cleared. Clearing the object means discarding its contents to release
- memory. The next time the object is accessed, it must be re-fetched from
- the server. Note that the inode cache limit is a soft limit; the cache
- limit may be exceeded if necessary to load very large objects, it may also
- be exceeded if open file handles prevent objects from being cleared.
+ When the cache limit is exceeded, the least recently used objects
+ are cleared. Clearing the object means discarding its contents to
+ release memory. The next time the object is accessed, it must be
+ re-fetched from the server. Note that the inode cache limit is a
+ soft limit; the cache limit may be exceeded if necessary to load
+ very large projects or collections, it may also be exceeded if an
+ inode can't be safely discarded based on kernel lookups
+ (has_ref()) or internal use count (in_use()).
"""
def __init__(self, cap, min_entries=4):
- self._entries = collections.OrderedDict()
+ self._cache_entries = collections.OrderedDict()
self._by_uuid = {}
self.cap = cap
self._total = 0
def total(self):
return self._total
- def _remove(self, obj, clear):
- if clear:
- # Kernel behavior seems to be that if a file is
- # referenced, its parents remain referenced too. This
- # means has_ref() exits early when a collection is not
- # candidate for eviction.
- #
- # By contrast, in_use() doesn't increment references on
- # parents, so it requires a full tree walk to determine if
- # a collection is a candidate for eviction. This takes
- # .07s for 240000 files, which becomes a major drag when
- # cap_cache is being called several times a second and
- # there are multiple non-evictable collections in the
- # cache.
- #
- # So it is important for performance that we do the
- # has_ref() check first.
-
- if obj.has_ref(True):
- _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
- return
+ def evict_candidates(self):
+ """Yield entries that are candidates to be evicted
+ and stop when the cache total has shrunk sufficiently.
- if obj.in_use():
- _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
- return
+ Implements a LRU cache, when an item is added or touch()ed it
+ goes to the back of the OrderedDict, so items in the front are
+ oldest. The Inodes._remove() function determines if the entry
+ can actually be removed safely.
- obj.kernel_invalidate()
- _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
- obj.clear()
+ """
- # The llfuse lock is released in del_entry(), which is called by
- # Directory.clear(). While the llfuse lock is released, it can happen
- # that a reentrant call removes this entry before this call gets to it.
- # Ensure that the entry is still valid before trying to remove it.
- if obj.inode not in self._entries:
+ if self._total <= self.cap:
return
- self._total -= obj.cache_size
- del self._entries[obj.inode]
+ _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
+ for ent in listvalues(self._cache_entries):
+ if self._total < self.cap or len(self._cache_entries) < self.min_entries:
+ break
+ if ent.cache_size > 0 or ent.dead:
+ # if cache_size is zero it's been cleared already
+ yield ent
+
+ def manage(self, obj):
+ """Add a new object to be cache managed.
+
+ This means evict_candidates will suggest clearing and removing
+ the inode when there is memory pressure.
+
+ """
+
+ if obj.inode in self._cache_entries:
+ return
+
+ obj.cache_size = obj.objsize()
+ self._total += obj.cache_size
+
+ self._cache_entries[obj.inode] = obj
+
+ obj.cache_uuid = obj.uuid()
if obj.cache_uuid:
- self._by_uuid[obj.cache_uuid].remove(obj)
- if not self._by_uuid[obj.cache_uuid]:
- del self._by_uuid[obj.cache_uuid]
- obj.cache_uuid = None
- if clear:
- _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
+ if obj.cache_uuid not in self._by_uuid:
+ self._by_uuid[obj.cache_uuid] = [obj]
+ else:
+ if obj not in self._by_uuid[obj.cache_uuid]:
+ self._by_uuid[obj.cache_uuid].append(obj)
- def cap_cache(self):
- if self._total > self.cap:
- for ent in listvalues(self._entries):
- if self._total < self.cap or len(self._entries) < self.min_entries:
- break
- self._remove(ent, True)
+ _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
+ obj.inode, obj.cache_size, obj.cache_uuid, self._total, len(self._cache_entries))
- def manage(self, obj):
- if obj.persisted():
+ def unmanage(self, entry):
+ """Stop managing an object in the cache.
+
+ This happens when an object is being removed from the inode
+ entries table.
+
+ """
+
+ if entry.inode not in self._cache_entries:
+ return
+
+ # manage cache size running sum
+ self._total -= entry.cache_size
+ entry.cache_size = 0
+
+ # manage the mapping of uuid to object
+ if entry.cache_uuid:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
+ entry.cache_uuid = None
+
+ # Now forget about it
+ del self._cache_entries[entry.inode]
+
+ def update_cache_size(self, obj):
+ """Update the cache total in response to the footprint of an
+ object changing (usually because it has been loaded or
+ cleared).
+
+ """
+ if obj.inode in self._cache_entries:
+ self._total -= obj.cache_size
obj.cache_size = obj.objsize()
- self._entries[obj.inode] = obj
- obj.cache_uuid = obj.uuid()
- if obj.cache_uuid:
- if obj.cache_uuid not in self._by_uuid:
- self._by_uuid[obj.cache_uuid] = [obj]
- else:
- if obj not in self._by_uuid[obj.cache_uuid]:
- self._by_uuid[obj.cache_uuid].append(obj)
- self._total += obj.objsize()
- _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
- obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
- self.cap_cache()
+ self._total += obj.cache_size
def touch(self, obj):
- if obj.persisted():
- if obj.inode in self._entries:
- self._remove(obj, False)
- self.manage(obj)
+ """Indicate an object was used recently, making it low
+ priority to be removed from the cache.
- def unmanage(self, obj):
- if obj.persisted() and obj.inode in self._entries:
- self._remove(obj, True)
+ """
+ if obj.inode in self._cache_entries:
+ self._cache_entries.move_to_end(obj.inode)
+ else:
+ self.manage(obj)
def find_by_uuid(self, uuid):
return self._by_uuid.get(uuid, [])
def clear(self):
- self._entries.clear()
+ self._cache_entries.clear()
self._by_uuid.clear()
self._total = 0
+
class Inodes(object):
- """Manage the set of inodes. This is the mapping from a numeric id
- to a concrete File or Directory object"""
+ """Manage the set of inodes.
+
+ This is the mapping from a numeric id to a concrete File or
+ Directory object
- def __init__(self, inode_cache, encoding="utf-8"):
+ """
+
+ def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
self.inode_cache = inode_cache
self.encoding = encoding
- self.deferred_invalidations = []
+ self._fsns = fsns
+ self._shutdown_started = shutdown_started or threading.Event()
+
+ self._inode_remove_queue = queue.Queue()
+ self._inode_remove_thread = threading.Thread(None, self._inode_remove)
+ self._inode_remove_thread.daemon = True
+ self._inode_remove_thread.start()
def __getitem__(self, item):
return self._entries[item]
return k in self._entries
def touch(self, entry):
+ """Update the access time, adjust the cache position, and
+ notify the _inode_remove thread to recheck the cache.
+
+ """
+
entry._atime = time.time()
self.inode_cache.touch(entry)
+ self.cap_cache()
+
+ def cap_cache(self):
+ """Notify the _inode_remove thread to recheck the cache."""
+ self._inode_remove_queue.put(("evict_candidates",))
def add_entry(self, entry):
+ """Assign a numeric inode to a new entry."""
+
entry.inode = next(self._counter)
if entry.inode == llfuse.ROOT_INODE:
entry.inc_ref()
self._entries[entry.inode] = entry
- self.inode_cache.manage(entry)
+ if entry.persisted():
+ # only "persisted" items can be reloaded from the server
+ # making them safe to evict automatically.
+ self.inode_cache.manage(entry)
+ self.cap_cache()
return entry
def del_entry(self, entry):
- if entry.ref_count == 0:
- self.inode_cache.unmanage(entry)
- del self._entries[entry.inode]
+ """Remove entry from the inode table.
+
+ Put a tombstone marker on it and notify the _inode_remove
+ thread to try and remove it.
+
+ """
+
+ entry.dead = True
+ self._inode_remove_queue.put(("remove", entry))
+ _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+ def _inode_remove(self):
+ """Background thread to handle tasks related to invalidating
+ inodes in the kernel, and removing objects from the inodes
+ table entirely.
+
+ """
+
+ locked_ops = collections.deque()
+ while True:
+ try:
+ entry = self._inode_remove_queue.get(True)
+ if entry is None:
+ return
+ # Process this entry
+ _logger.debug("_inode_remove %s", entry)
+ if self._inode_op(entry, locked_ops):
+ self._inode_remove_queue.task_done()
+
+ # Drain the queue of any other entries
+ while True:
+ try:
+ entry = self._inode_remove_queue.get(False)
+ if entry is None:
+ return
+ _logger.debug("_inode_remove %s", entry)
+ if self._inode_op(entry, locked_ops):
+ self._inode_remove_queue.task_done()
+ except queue.Empty:
+ break
+
+ with llfuse.lock:
+ while len(locked_ops) > 0:
+ if self._inode_op(locked_ops.popleft(), None):
+ self._inode_remove_queue.task_done()
+ for entry in self.inode_cache.evict_candidates():
+ self._remove(entry)
+ except Exception as e:
+ _logger.exception("_inode_remove")
+
+ def _inode_op(self, op, locked_ops):
+ """Process an inode operation: attempt to remove an inode
+ entry, tell the kernel to invalidate a inode metadata or
+ directory entry, or trigger a cache check.
+
+ """
+ if self._shutdown_started.is_set():
+ return True
+ if op[0] == "remove":
+ if locked_ops is None:
+ self._remove(op[1])
+ return True
+ else:
+ locked_ops.append(op)
+ return False
+ if op[0] == "invalidate_inode":
+ _logger.debug("sending invalidate inode %i", op[1])
+ llfuse.invalidate_inode(op[1])
+ return True
+ if op[0] == "invalidate_entry":
+ _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
+ llfuse.invalidate_entry(op[1], op[2])
+ return True
+ if op[0] == "evict_candidates":
+ return True
+
+ def wait_remove_queue_empty(self):
+ # used by tests
+ self._inode_remove_queue.join()
+
+ def _remove(self, entry):
+ """Remove an inode entry if possible.
+
+ If the entry is still referenced or in use, don't do anything.
+ If this is not referenced but the parent is still referenced,
+ clear any data held by the object (which may include directory
+ entries under the object) but don't remove it from the inode
+ table.
+
+ """
+ try:
+ if entry.inode is None:
+ # Removed already
+ return
+
+ # Tell the kernel it should forget about it.
+ entry.kernel_invalidate()
+
+ if entry.has_ref():
+ # has kernel reference, could still be accessed.
+ # when the kernel forgets about it, we can delete it.
+ #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+ return
+
+ if entry.in_use():
+ # referenced internally, stay pinned
+ #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+ return
+
+ forget_inode = True
+ parent = self._entries.get(entry.parent_inode)
+ if (parent is not None and parent.has_ref()) or entry.inode == llfuse.ROOT_INODE:
+ # the parent is still referenced, so we'll keep the
+ # entry but wipe out the stuff under it
+ forget_inode = False
+
+ if entry.cache_size == 0 and not forget_inode:
+ # Was cleared already
+ return
+
+ if forget_inode:
+ self.inode_cache.unmanage(entry)
+
+ _logger.debug("InodeCache removing inode %i", entry.inode)
+
+ # For directories, clear the contents
+ entry.clear()
+
+ _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s, inode entries %i, type %s",
+ entry.inode, self.inode_cache.total(), forget_inode,
+ len(self._entries), type(entry))
+ if forget_inode:
+ del self._entries[entry.inode]
+ entry.inode = None
+
+ # stop anything else
with llfuse.lock_released:
entry.finalize()
- entry.inode = None
- else:
- entry.dead = True
- _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+ except Exception as e:
+ _logger.exception("failed remove")
def invalidate_inode(self, entry):
- if entry.has_ref(False):
+ if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- llfuse.invalidate_inode(entry.inode)
+ self._inode_remove_queue.put(("invalidate_inode", entry.inode))
def invalidate_entry(self, entry, name):
- if entry.has_ref(False):
+ if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+ self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
+
+ def begin_shutdown(self):
+ self._inode_remove_queue.put(None)
+ if self._inode_remove_thread is not None:
+ self._inode_remove_thread.join()
+ self._inode_remove_thread = None
def clear(self):
+ with llfuse.lock_released:
+ self.begin_shutdown()
+
self.inode_cache.clear()
for k,v in viewitems(self._entries):
self._entries.clear()
+ def forward_slash_subst(self):
+ return self._fsns
+
def catch_exceptions(orig_func):
"""Catch uncaught exceptions and log them consistently."""
rename_time = fuse_time.labels(op='rename')
flush_time = fuse_time.labels(op='flush')
- def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
+ def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
super(Operations, self).__init__()
self._api_client = api_client
if not inode_cache:
inode_cache = InodeCache(cap=256*1024*1024)
- self.inodes = Inodes(inode_cache, encoding=encoding)
+
+ if fsns is None:
+ try:
+ fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
+ except KeyError:
+ # old API server with no FSNS config
+ fsns = '_'
+ else:
+ if fsns == '' or fsns == '/':
+ fsns = None
+
+ # If we get overlapping shutdown events (e.g., fusermount -u
+ # -z and operations.destroy()) llfuse calls forget() on inodes
+ # that have already been deleted. To avoid this, we make
+ # forget() a no-op if called after destroy().
+ self._shutdown_started = threading.Event()
+
+ self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
+ shutdown_started=self._shutdown_started)
self.uid = uid
self.gid = gid
self.enable_write = enable_write
# is fully initialized should wait() on this event object.
self.initlock = threading.Event()
- # If we get overlapping shutdown events (e.g., fusermount -u
- # -z and operations.destroy()) llfuse calls forget() on inodes
- # that have already been deleted. To avoid this, we make
- # forget() a no-op if called after destroy().
- self._shutdown_started = threading.Event()
-
self.num_retries = num_retries
self.read_counter = arvados.keep.Counter()
def metric_count_func(self, opname):
return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+ def begin_shutdown(self):
+ self._shutdown_started.set()
+ self.inodes.begin_shutdown()
+
@destroy_time.time()
@catch_exceptions
def destroy(self):
- self._shutdown_started.set()
+ _logger.debug("arv-mount destroy: start")
+
+ self.begin_shutdown()
+
if self.events:
self.events.close()
self.events = None
- # Different versions of llfuse require and forbid us to
- # acquire the lock here. See #8345#note-37, #10805#note-9.
- if LLFUSE_VERSION_0 and llfuse.lock.acquire():
- # llfuse < 0.42
- self.inodes.clear()
- llfuse.lock.release()
- else:
- # llfuse >= 0.42
- self.inodes.clear()
+ self.inodes.clear()
+
+ _logger.debug("arv-mount destroy: complete")
+
def access(self, inode, mode, ctx):
return True
@catch_exceptions
def getattr(self, inode, ctx=None):
if inode not in self.inodes:
+ _logger.debug("arv-mount getattr: inode %i missing", inode)
raise llfuse.FUSEError(errno.ENOENT)
e = self.inodes[inode]
if name == '.':
inode = parent_inode
- else:
- if parent_inode in self.inodes:
- p = self.inodes[parent_inode]
- self.inodes.touch(p)
- if name == '..':
- inode = p.parent_inode
- elif isinstance(p, Directory) and name in p:
- inode = p[name].inode
+ elif parent_inode in self.inodes:
+ p = self.inodes[parent_inode]
+ self.inodes.touch(p)
+ if name == '..':
+ inode = p.parent_inode
+ elif isinstance(p, Directory) and name in p:
+ inode = p[name].inode
if inode != None:
_logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
if inode in self.inodes:
p = self.inodes[inode]
else:
+ _logger.debug("arv-mount open: inode %i missing", inode)
raise llfuse.FUSEError(errno.ENOENT)
if isinstance(p, Directory):
finally:
self._filehandles[fh].release()
del self._filehandles[fh]
- self.inodes.inode_cache.cap_cache()
+ self.inodes.cap_cache()
def releasedir(self, fh):
self.release(fh)
if inode in self.inodes:
p = self.inodes[inode]
else:
+ _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
raise llfuse.FUSEError(errno.ENOENT)
if not isinstance(p, Directory):
if p.parent_inode in self.inodes:
parent = self.inodes[p.parent_inode]
else:
+ _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
raise llfuse.FUSEError(errno.EIO)
+ _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
+
# update atime
self.inodes.touch(p)
+ p.inc_use()
self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+ p.dec_use()
return fh
@readdir_time.time()
e = off
while e < len(handle.entries):
- if handle.entries[e][1].inode in self.inodes:
- yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
+ ent = handle.entries[e]
+ if ent[1].inode in self.inodes:
+ yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
e += 1
@statfs_time.time()
and the value referencing a File or Directory object.
"""
- def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
+ def __init__(self, parent_inode, inodes, enable_write, filters):
"""parent_inode is the integer inode number"""
super(Directory, self).__init__()
raise Exception("parent_inode should be an int")
self.parent_inode = parent_inode
self.inodes = inodes
- self.apiconfig = apiconfig
self._entries = {}
self._mtime = time.time()
self._enable_write = enable_write
else:
yield [f_name, *f[1:]]
- def forward_slash_subst(self):
- if not hasattr(self, '_fsns'):
- self._fsns = None
- config = self.apiconfig()
- try:
- self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
- except KeyError:
- # old API server with no FSNS config
- self._fsns = '_'
- else:
- if self._fsns == '' or self._fsns == '/':
- self._fsns = None
- return self._fsns
-
def unsanitize_filename(self, incoming):
"""Replace ForwardSlashNameSubstitution value with /"""
- fsns = self.forward_slash_subst()
+ fsns = self.inodes.forward_slash_subst()
if isinstance(fsns, str):
return incoming.replace(fsns, '/')
else:
elif dirty == '..':
return '__'
else:
- fsns = self.forward_slash_subst()
+ fsns = self.inodes.forward_slash_subst()
if isinstance(fsns, str):
dirty = dirty.replace('/', fsns)
return _disallowed_filename_characters.sub('_', dirty)
self.inodes.touch(self)
super(Directory, self).fresh()
+ def objsize(self):
+ # This is a very rough guess of the amount of overhead involved for
+ # each directory entry (128 bytes is 16 * 8-byte pointers).
+ return len(self._entries) * 128
+
def merge(self, items, fn, same, new_entry):
"""Helper method for updating the contents of the directory.
entries that are the same in both the old and new lists, create new
entries, and delete old entries missing from the new list.
- :items: iterable with new directory contents
+ Arguments:
+ * items: Iterable --- New directory contents
- :fn: function to take an entry in 'items' and return the desired file or
+ * fn: Callable --- Takes an entry in 'items' and return the desired file or
directory name, or None if this entry should be skipped
- :same: function to compare an existing entry (a File or Directory
+ * same: Callable --- Compare an existing entry (a File or Directory
object) with an entry in the items list to determine whether to keep
the existing entry.
- :new_entry: function to create a new directory entry (File or Directory
+ * new_entry: Callable --- Create a new directory entry (File or Directory
object) from an entry in the items list.
"""
changed = False
for i in items:
name = self.sanitize_filename(fn(i))
- if name:
- if name in oldentries and same(oldentries[name], i):
+ if not name:
+ continue
+ if name in oldentries:
+ ent = oldentries[name]
+ if same(ent, i):
# move existing directory entry over
- self._entries[name] = oldentries[name]
+ self._entries[name] = ent
del oldentries[name]
- else:
- _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
- # create new directory entry
- ent = new_entry(i)
- if ent is not None:
- self._entries[name] = self.inodes.add_entry(ent)
- changed = True
+ self.inodes.inode_cache.touch(ent)
+
+ for i in items:
+ name = self.sanitize_filename(fn(i))
+ if not name:
+ continue
+ if name not in self._entries:
+ # create new directory entry
+ ent = new_entry(i)
+ if ent is not None:
+ self._entries[name] = self.inodes.add_entry(ent)
+ changed = True
+ _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
if changed:
self.inodes.invalidate_inode(self)
self._mtime = time.time()
+ self.inodes.inode_cache.update_cache_size(self)
self.fresh()
return True
return False
- def has_ref(self, only_children):
- if super(Directory, self).has_ref(only_children):
- return True
- for v in self._entries.values():
- if v.has_ref(False):
- return True
- return False
-
def clear(self):
"""Delete all entries"""
oldentries = self._entries
self._entries = {}
+ self.invalidate()
for n in oldentries:
- oldentries[n].clear()
self.inodes.del_entry(oldentries[n])
- self.invalidate()
+ self.inodes.inode_cache.update_cache_size(self)
def kernel_invalidate(self):
# Invalidating the dentry on the parent implies invalidating all paths
# below it as well.
- parent = self.inodes[self.parent_inode]
+ if self.parent_inode in self.inodes:
+ parent = self.inodes[self.parent_inode]
+ else:
+ # parent was removed already.
+ return
# Find self on the parent in order to invalidate this path.
# Calling the public items() method might trigger a refresh,
self.inodes.invalidate_entry(parent, k)
break
+ self.inodes.invalidate_inode(self)
+
def mtime(self):
return self._mtime
"""
- def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
- super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
- self.apiconfig = apiconfig
+ def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
+ super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
self.collection = collection
self.collection_root = collection_root
self.collection_record_file = None
self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
self.inode,
self.inodes,
- self.apiconfig,
self._enable_write,
self._filters,
item,
super(CollectionDirectoryBase, self).clear()
self.collection = None
+ def objsize(self):
+ # objsize for the whole collection is represented at the root,
+ # don't double-count it
+ return 0
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
self.api = api
self.num_retries = num_retries
self._poll = True
self.collection_record_file.invalidate()
self.inodes.invalidate_inode(self.collection_record_file)
_logger.debug("%s invalidated collection record file", self)
+ self.inodes.inode_cache.update_cache_size(self)
self.fresh()
def uuid(self):
return (self.collection_locator is not None)
def objsize(self):
- # This is an empirically-derived heuristic to estimate the memory used
- # to store this collection's metadata. Calculating the memory
- # footprint directly would be more accurate, but also more complicated.
- return self._manifest_size * 128
+ # This is a very rough guess of the amount of overhead
+ # involved for a collection; you've got the manifest text
+ # itself which is not discarded by the Collection class, then
+ # the block identifiers that get copied into their own
+ # strings, then the rest of the overhead of the Python
+ # objects.
+ return self._manifest_size * 4
def finalize(self):
if self.collection is not None:
if self.writable():
- self.collection.save()
+ try:
+ self.collection.save()
+ except Exception as e:
+ _logger.exception("Failed to save collection %s", self.collection_locator)
self.collection.stop_threads()
def clear(self):
if self.collection is not None:
self.collection.stop_threads()
- super(CollectionDirectory, self).clear()
self._manifest_size = 0
+ super(CollectionDirectory, self).clear()
class TmpCollectionDirectory(CollectionDirectoryBase):
# This is always enable_write=True because it never tries to
# save to the backend
super(TmpCollectionDirectory, self).__init__(
- parent_inode, inodes, api_client.config, True, filters, collection, self)
+ parent_inode, inodes, True, filters, collection, self)
self.populate(self.mtime())
def on_event(self, *args, **kwargs):
""".lstrip()
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
- super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
"""A special directory that contains as subdirectories all tags visible to the user."""
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self._poll = True
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.tag = tag
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
project_object, poll=True, poll_time=3, storage_classes=None):
- super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
self._current_user = None
self._full_listing = False
self.storage_classes = storage_classes
+ self.recursively_contained = False
+
+ # Filter groups can contain themselves, which causes tools
+ # that walk the filesystem to get stuck in an infinite loop,
+ # so suppress returning a listing in that case.
+ if self.project_object.get("group_class") == "filter":
+ iter_parent_inode = parent_inode
+ while iter_parent_inode != llfuse.ROOT_INODE:
+ parent_dir = self.inodes[iter_parent_inode]
+ if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
+ self.recursively_contained = True
+ break
+ iter_parent_inode = parent_dir.parent_inode
def want_event_subscribe(self):
return True
self.project_object_file = ObjectFile(self.inode, self.project_object)
self.inodes.add_entry(self.project_object_file)
- if not self._full_listing:
+ if self.recursively_contained or not self._full_listing:
return True
def samefn(a, i):
*self._filters_for('collections', qualified=True),
],
) if obj['current_version_uuid'] == obj['uuid'])
-
# end with llfuse.lock_released, re-acquire lock
self.merge(contents,
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
exclude, poll=False, poll_time=60, storage_classes=None):
- super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)