X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8d3cf07be66fb6b8a58c3999783ebc753c30428f..8dedd02357a95a0ae2c7961c8f1d0896b6311b3b:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index e3b8dd4c2c..9c78805107 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -36,7 +36,9 @@ class Directory(FreshBase): and the value referencing a File or Directory object. """ - def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters): + __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters") + + def __init__(self, parent_inode, inodes, enable_write, filters): """parent_inode is the integer inode number""" super(Directory, self).__init__() @@ -46,7 +48,6 @@ class Directory(FreshBase): raise Exception("parent_inode should be an int") self.parent_inode = parent_inode self.inodes = inodes - self.apiconfig = apiconfig self._entries = {} self._mtime = time.time() self._enable_write = enable_write @@ -64,23 +65,9 @@ class Directory(FreshBase): else: yield [f_name, *f[1:]] - def forward_slash_subst(self): - if not hasattr(self, '_fsns'): - self._fsns = None - config = self.apiconfig() - try: - self._fsns = config["Collections"]["ForwardSlashNameSubstitution"] - except KeyError: - # old API server with no FSNS config - self._fsns = '_' - else: - if self._fsns == '' or self._fsns == '/': - self._fsns = None - return self._fsns - def unsanitize_filename(self, incoming): """Replace ForwardSlashNameSubstitution value with /""" - fsns = self.forward_slash_subst() + fsns = self.inodes.forward_slash_subst() if isinstance(fsns, str): return incoming.replace(fsns, '/') else: @@ -99,7 +86,7 @@ class Directory(FreshBase): elif dirty == '..': return '__' else: - fsns = self.forward_slash_subst() + fsns = self.inodes.forward_slash_subst() if isinstance(fsns, str): dirty = dirty.replace('/', fsns) return _disallowed_filename_characters.sub('_', dirty) @@ -150,6 +137,10 @@ class Directory(FreshBase): self.inodes.touch(self) super(Directory, self).fresh() + def objsize(self): + # Rough estimate of memory footprint based on using pympler + return len(self._entries) * 1024 + def merge(self, items, fn, same, new_entry): """Helper method for updating the contents of the directory. @@ -157,16 +148,17 @@ class Directory(FreshBase): entries that are the same in both the old and new lists, create new entries, and delete old entries missing from the new list. - :items: iterable with new directory contents + Arguments: + * items: Iterable --- New directory contents - :fn: function to take an entry in 'items' and return the desired file or + * fn: Callable --- Takes an entry in 'items' and return the desired file or directory name, or None if this entry should be skipped - :same: function to compare an existing entry (a File or Directory + * same: Callable --- Compare an existing entry (a File or Directory object) with an entry in the items list to determine whether to keep the existing entry. - :new_entry: function to create a new directory entry (File or Directory + * new_entry: Callable --- Create a new directory entry (File or Directory object) from an entry in the items list. """ @@ -176,29 +168,43 @@ class Directory(FreshBase): changed = False for i in items: name = self.sanitize_filename(fn(i)) - if name: - if name in oldentries and same(oldentries[name], i): + if not name: + continue + if name in oldentries: + ent = oldentries[name] + if same(ent, i) and ent.parent_inode == self.inode: # move existing directory entry over - self._entries[name] = oldentries[name] + self._entries[name] = ent del oldentries[name] - else: - _logger.debug("Adding entry '%s' to inode %i", name, self.inode) - # create new directory entry - ent = new_entry(i) - if ent is not None: - self._entries[name] = self.inodes.add_entry(ent) - changed = True + self.inodes.inode_cache.touch(ent) + + for i in items: + name = self.sanitize_filename(fn(i)) + if not name: + continue + if name not in self._entries: + # create new directory entry + ent = new_entry(i) + if ent is not None: + self._entries[name] = self.inodes.add_entry(ent) + # need to invalidate this just in case there was a + # previous entry that couldn't be moved over or a + # lookup that returned file not found and cached + # a negative result + self.inodes.invalidate_entry(self, name) + changed = True + _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode) # delete any other directory entries that were not in found in 'items' - for i in oldentries: - _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode) - self.inodes.invalidate_entry(self, i) - self.inodes.del_entry(oldentries[i]) + for name, ent in oldentries.items(): + _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode) + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) changed = True if changed: - self.inodes.invalidate_inode(self) self._mtime = time.time() + self.inodes.inode_cache.update_cache_size(self) self.fresh() @@ -210,27 +216,27 @@ class Directory(FreshBase): return True return False - def has_ref(self, only_children): - if super(Directory, self).has_ref(only_children): - return True - for v in self._entries.values(): - if v.has_ref(False): - return True - return False - def clear(self): """Delete all entries""" + if not self._entries: + return oldentries = self._entries self._entries = {} - for n in oldentries: - oldentries[n].clear() - self.inodes.del_entry(oldentries[n]) self.invalidate() + for name, ent in oldentries.items(): + ent.clear() + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) + self.inodes.inode_cache.update_cache_size(self) def kernel_invalidate(self): # Invalidating the dentry on the parent implies invalidating all paths # below it as well. - parent = self.inodes[self.parent_inode] + if self.parent_inode in self.inodes: + parent = self.inodes[self.parent_inode] + else: + # parent was removed already. + return # Find self on the parent in order to invalidate this path. # Calling the public items() method might trigger a refresh, @@ -283,9 +289,10 @@ class CollectionDirectoryBase(Directory): """ - def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root): - super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters) - self.apiconfig = apiconfig + __slots__ = ("collection", "collection_root", "collection_record_file") + + def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root): + super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters) self.collection = collection self.collection_root = collection_root self.collection_record_file = None @@ -293,17 +300,16 @@ class CollectionDirectoryBase(Directory): def new_entry(self, name, item, mtime): name = self.sanitize_filename(name) if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - if item.fuse_entry.dead is not True: - raise Exception("Can only reparent dead inode entry") + if item.fuse_entry.parent_inode is not None: + raise Exception("Can only reparent unparented inode entry") if item.fuse_entry.inode is None: raise Exception("Reparented entry must still have valid inode") - item.fuse_entry.dead = False + item.fuse_entry.parent_inode = self.inode self._entries[name] = item.fuse_entry elif isinstance(item, arvados.collection.RichCollectionBase): self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase( self.inode, self.inodes, - self.apiconfig, self._enable_write, self._filters, item, @@ -449,14 +455,23 @@ class CollectionDirectoryBase(Directory): def clear(self): super(CollectionDirectoryBase, self).clear() + if self.collection is not None: + self.collection.unsubscribe() self.collection = None + def objsize(self): + # objsize for the whole collection is represented at the root, + # don't double-count it + return 0 class CollectionDirectory(CollectionDirectoryBase): """Represents the root of a directory tree representing a collection.""" + __slots__ = ("api", "num_retries", "collection_locator", + "_manifest_size", "_writable", "_updating_lock") + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None): - super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self) + super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self) self.api = api self.num_retries = num_retries self._poll = True @@ -514,7 +529,10 @@ class CollectionDirectory(CollectionDirectoryBase): if self.collection_record_file is not None: self.collection_record_file.invalidate() self.inodes.invalidate_inode(self.collection_record_file) - _logger.debug("%s invalidated collection record file", self) + _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode, + self.collection_record_file.inode) + self.inodes.update_uuid(self) + self.inodes.inode_cache.update_cache_size(self) self.fresh() def uuid(self): @@ -592,6 +610,7 @@ class CollectionDirectory(CollectionDirectoryBase): return False @use_counter + @check_update def collection_record(self): self.flush() return self.collection.api_response() @@ -625,22 +644,32 @@ class CollectionDirectory(CollectionDirectoryBase): return (self.collection_locator is not None) def objsize(self): - # This is an empirically-derived heuristic to estimate the memory used - # to store this collection's metadata. Calculating the memory - # footprint directly would be more accurate, but also more complicated. - return self._manifest_size * 128 + # This is a rough guess of the amount of overhead involved for + # a collection; the assumptions are that that each file + # averages 128 bytes in the manifest, but consume 1024 bytes + # of Python data structures, so 1024/128=8 means we estimate + # the RAM footprint at 8 times the size of bare manifest text. + return self._manifest_size * 8 def finalize(self): - if self.collection is not None: - if self.writable(): + if self.collection is None: + return + + if self.writable(): + try: self.collection.save() - self.collection.stop_threads() + except Exception as e: + _logger.exception("Failed to save collection %s", self.collection_locator) + self.collection.stop_threads() def clear(self): if self.collection is not None: self.collection.stop_threads() - super(CollectionDirectory, self).clear() self._manifest_size = 0 + super(CollectionDirectory, self).clear() + if self.collection_record_file is not None: + self.inodes.del_entry(self.collection_record_file) + self.collection_record_file = None class TmpCollectionDirectory(CollectionDirectoryBase): @@ -667,7 +696,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase): # This is always enable_write=True because it never tries to # save to the backend super(TmpCollectionDirectory, self).__init__( - parent_inode, inodes, api_client.config, True, filters, collection, self) + parent_inode, inodes, True, filters, collection, self) self.populate(self.mtime()) def on_event(self, *args, **kwargs): @@ -689,7 +718,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase): with self.collection.lock: self.collection_record_file.invalidate() self.inodes.invalidate_inode(self.collection_record_file) - _logger.debug("%s invalidated collection record", self) + _logger.debug("%s invalidated collection record", self.inode) finally: while lockcount > 0: self.collection.lock.acquire() @@ -764,7 +793,7 @@ and the directory will appear if it exists. """.lstrip() def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None): - super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters) + super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.pdh_only = pdh_only @@ -863,7 +892,7 @@ class TagsDirectory(Directory): """A special directory that contains as subdirectories all tags visible to the user.""" def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60): - super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters) + super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self._poll = True @@ -943,7 +972,7 @@ class TagDirectory(Directory): def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag, poll=False, poll_time=60): - super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters) + super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.tag = tag @@ -984,9 +1013,13 @@ class TagDirectory(Directory): class ProjectDirectory(Directory): """A special directory that contains the contents of a project.""" + __slots__ = ("api", "num_retries", "project_object", "project_object_file", + "project_uuid", "_updating_lock", + "_current_user", "_full_listing", "storage_classes", "recursively_contained") + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, project_object, poll=True, poll_time=3, storage_classes=None): - super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters) + super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.project_object = project_object @@ -998,6 +1031,19 @@ class ProjectDirectory(Directory): self._current_user = None self._full_listing = False self.storage_classes = storage_classes + self.recursively_contained = False + + # Filter groups can contain themselves, which causes tools + # that walk the filesystem to get stuck in an infinite loop, + # so suppress returning a listing in that case. + if self.project_object.get("group_class") == "filter": + iter_parent_inode = parent_inode + while iter_parent_inode != llfuse.ROOT_INODE: + parent_dir = self.inodes[iter_parent_inode] + if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid: + self.recursively_contained = True + break + iter_parent_inode = parent_dir.parent_inode def want_event_subscribe(self): return True @@ -1048,7 +1094,7 @@ class ProjectDirectory(Directory): self.project_object_file = ObjectFile(self.inode, self.project_object) self.inodes.add_entry(self.project_object_file) - if not self._full_listing: + if self.recursively_contained or not self._full_listing: return True def samefn(a, i): @@ -1092,7 +1138,6 @@ class ProjectDirectory(Directory): *self._filters_for('collections', qualified=True), ], ) if obj['current_version_uuid'] == obj['uuid']) - # end with llfuse.lock_released, re-acquire lock self.merge(contents, @@ -1175,6 +1220,12 @@ class ProjectDirectory(Directory): def persisted(self): return True + def clear(self): + super(ProjectDirectory, self).clear() + if self.project_object_file is not None: + self.inodes.del_entry(self.project_object_file) + self.project_object_file = None + @use_counter @check_update def mkdir(self, name): @@ -1294,7 +1345,7 @@ class SharedDirectory(Directory): def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, exclude, poll=False, poll_time=60, storage_classes=None): - super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters) + super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.current_user = api.users().current().execute(num_retries=num_retries)