X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3134405ebc155a8a51738b7c6d0d4be348c65087..8dedd02357a95a0ae2c7961c8f1d0896b6311b3b:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index a2e33c7b3b..9c78805107 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -26,7 +26,7 @@ _logger = logging.getLogger('arvados.arvados_fuse') # Match any character which FUSE or Linux cannot accommodate as part # of a filename. (If present in a collection filename, they will # appear as underscores in the fuse mount.) -_disallowed_filename_characters = re.compile('[\x00/]') +_disallowed_filename_characters = re.compile(r'[\x00/]') class Directory(FreshBase): @@ -36,7 +36,9 @@ class Directory(FreshBase): and the value referencing a File or Directory object. """ - def __init__(self, parent_inode, inodes, apiconfig, enable_write): + __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters") + + def __init__(self, parent_inode, inodes, enable_write, filters): """parent_inode is the integer inode number""" super(Directory, self).__init__() @@ -46,28 +48,26 @@ class Directory(FreshBase): raise Exception("parent_inode should be an int") self.parent_inode = parent_inode self.inodes = inodes - self.apiconfig = apiconfig self._entries = {} self._mtime = time.time() self._enable_write = enable_write - - def forward_slash_subst(self): - if not hasattr(self, '_fsns'): - self._fsns = None - config = self.apiconfig() - try: - self._fsns = config["Collections"]["ForwardSlashNameSubstitution"] - except KeyError: - # old API server with no FSNS config - self._fsns = '_' + self._filters = filters or [] + + def _filters_for(self, subtype, *, qualified): + for f in self._filters: + f_type, _, f_name = f[0].partition('.') + if not f_name: + yield f + elif f_type != subtype: + pass + elif qualified: + yield f else: - if self._fsns == '' or self._fsns == '/': - self._fsns = None - return self._fsns + yield [f_name, *f[1:]] def unsanitize_filename(self, incoming): """Replace ForwardSlashNameSubstitution value with /""" - fsns = self.forward_slash_subst() + fsns = self.inodes.forward_slash_subst() if isinstance(fsns, str): return incoming.replace(fsns, '/') else: @@ -86,7 +86,7 @@ class Directory(FreshBase): elif dirty == '..': return '__' else: - fsns = self.forward_slash_subst() + fsns = self.inodes.forward_slash_subst() if isinstance(fsns, str): dirty = dirty.replace('/', fsns) return _disallowed_filename_characters.sub('_', dirty) @@ -137,6 +137,10 @@ class Directory(FreshBase): self.inodes.touch(self) super(Directory, self).fresh() + def objsize(self): + # Rough estimate of memory footprint based on using pympler + return len(self._entries) * 1024 + def merge(self, items, fn, same, new_entry): """Helper method for updating the contents of the directory. @@ -144,16 +148,17 @@ class Directory(FreshBase): entries that are the same in both the old and new lists, create new entries, and delete old entries missing from the new list. - :items: iterable with new directory contents + Arguments: + * items: Iterable --- New directory contents - :fn: function to take an entry in 'items' and return the desired file or + * fn: Callable --- Takes an entry in 'items' and return the desired file or directory name, or None if this entry should be skipped - :same: function to compare an existing entry (a File or Directory + * same: Callable --- Compare an existing entry (a File or Directory object) with an entry in the items list to determine whether to keep the existing entry. - :new_entry: function to create a new directory entry (File or Directory + * new_entry: Callable --- Create a new directory entry (File or Directory object) from an entry in the items list. """ @@ -163,29 +168,43 @@ class Directory(FreshBase): changed = False for i in items: name = self.sanitize_filename(fn(i)) - if name: - if name in oldentries and same(oldentries[name], i): + if not name: + continue + if name in oldentries: + ent = oldentries[name] + if same(ent, i) and ent.parent_inode == self.inode: # move existing directory entry over - self._entries[name] = oldentries[name] + self._entries[name] = ent del oldentries[name] - else: - _logger.debug("Adding entry '%s' to inode %i", name, self.inode) - # create new directory entry - ent = new_entry(i) - if ent is not None: - self._entries[name] = self.inodes.add_entry(ent) - changed = True + self.inodes.inode_cache.touch(ent) + + for i in items: + name = self.sanitize_filename(fn(i)) + if not name: + continue + if name not in self._entries: + # create new directory entry + ent = new_entry(i) + if ent is not None: + self._entries[name] = self.inodes.add_entry(ent) + # need to invalidate this just in case there was a + # previous entry that couldn't be moved over or a + # lookup that returned file not found and cached + # a negative result + self.inodes.invalidate_entry(self, name) + changed = True + _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode) # delete any other directory entries that were not in found in 'items' - for i in oldentries: - _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode) - self.inodes.invalidate_entry(self, i) - self.inodes.del_entry(oldentries[i]) + for name, ent in oldentries.items(): + _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode) + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) changed = True if changed: - self.inodes.invalidate_inode(self) self._mtime = time.time() + self.inodes.inode_cache.update_cache_size(self) self.fresh() @@ -197,27 +216,27 @@ class Directory(FreshBase): return True return False - def has_ref(self, only_children): - if super(Directory, self).has_ref(only_children): - return True - for v in self._entries.values(): - if v.has_ref(False): - return True - return False - def clear(self): """Delete all entries""" + if not self._entries: + return oldentries = self._entries self._entries = {} - for n in oldentries: - oldentries[n].clear() - self.inodes.del_entry(oldentries[n]) self.invalidate() + for name, ent in oldentries.items(): + ent.clear() + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) + self.inodes.inode_cache.update_cache_size(self) def kernel_invalidate(self): # Invalidating the dentry on the parent implies invalidating all paths # below it as well. - parent = self.inodes[self.parent_inode] + if self.parent_inode in self.inodes: + parent = self.inodes[self.parent_inode] + else: + # parent was removed already. + return # Find self on the parent in order to invalidate this path. # Calling the public items() method might trigger a refresh, @@ -270,28 +289,42 @@ class CollectionDirectoryBase(Directory): """ - def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection): - super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write) - self.apiconfig = apiconfig + __slots__ = ("collection", "collection_root", "collection_record_file") + + def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root): + super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters) self.collection = collection + self.collection_root = collection_root + self.collection_record_file = None def new_entry(self, name, item, mtime): name = self.sanitize_filename(name) if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - if item.fuse_entry.dead is not True: - raise Exception("Can only reparent dead inode entry") + if item.fuse_entry.parent_inode is not None: + raise Exception("Can only reparent unparented inode entry") if item.fuse_entry.inode is None: raise Exception("Reparented entry must still have valid inode") - item.fuse_entry.dead = False + item.fuse_entry.parent_inode = self.inode self._entries[name] = item.fuse_entry elif isinstance(item, arvados.collection.RichCollectionBase): - self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item)) + self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase( + self.inode, + self.inodes, + self._enable_write, + self._filters, + item, + self.collection_root, + )) self._entries[name].populate(mtime) else: self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write)) item.fuse_entry = self._entries[name] def on_event(self, event, collection, name, item): + # These are events from the Collection object (ADD/DEL/MOD) + # emitted by operations on the Collection object (like + # "mkdirs" or "remove"), and by "update", which we need to + # synchronize with our FUSE objects that are assigned inodes. if collection == self.collection: name = self.sanitize_filename(name) @@ -336,6 +369,10 @@ class CollectionDirectoryBase(Directory): self.inodes.invalidate_inode(item.fuse_entry) elif name in self._entries: self.inodes.invalidate_inode(self._entries[name]) + + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) finally: while lockcount > 0: self.collection.lock.acquire() @@ -353,10 +390,7 @@ class CollectionDirectoryBase(Directory): @use_counter def flush(self): - if not self.writable(): - return - with llfuse.lock_released: - self.collection.root_collection().save() + self.collection_root.flush() @use_counter @check_update @@ -421,18 +455,25 @@ class CollectionDirectoryBase(Directory): def clear(self): super(CollectionDirectoryBase, self).clear() + if self.collection is not None: + self.collection.unsubscribe() self.collection = None + def objsize(self): + # objsize for the whole collection is represented at the root, + # don't double-count it + return 0 class CollectionDirectory(CollectionDirectoryBase): """Represents the root of a directory tree representing a collection.""" - def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None): - super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None) + __slots__ = ("api", "num_retries", "collection_locator", + "_manifest_size", "_writable", "_updating_lock") + + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None): + super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self) self.api = api self.num_retries = num_retries - self.collection_record_file = None - self.collection_record = None self._poll = True try: self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2) @@ -457,62 +498,73 @@ class CollectionDirectory(CollectionDirectoryBase): def writable(self): return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable) + @use_counter + def flush(self): + if not self.writable(): + return + with llfuse.lock_released: + with self._updating_lock: + if self.collection.committed(): + self.collection.update() + else: + self.collection.save() + self.new_collection_record(self.collection.api_response()) + def want_event_subscribe(self): return (uuid_pattern.match(self.collection_locator) is not None) - # Used by arv-web.py to switch the contents of the CollectionDirectory - def change_collection(self, new_locator): - """Switch the contents of the CollectionDirectory. - - Must be called with llfuse.lock held. - """ - - self.collection_locator = new_locator - self.collection_record = None - self.update() - def new_collection(self, new_collection_record, coll_reader): if self.inode: self.clear() - - self.collection_record = new_collection_record - - if self.collection_record: - self._mtime = convertTime(self.collection_record.get('modified_at')) - self.collection_locator = self.collection_record["uuid"] - if self.collection_record_file is not None: - self.collection_record_file.update(self.collection_record) - self.collection = coll_reader + self.new_collection_record(new_collection_record) self.populate(self.mtime()) + def new_collection_record(self, new_collection_record): + if not new_collection_record: + raise Exception("invalid new_collection_record") + self._mtime = convertTime(new_collection_record.get('modified_at')) + self._manifest_size = len(new_collection_record["manifest_text"]) + self.collection_locator = new_collection_record["uuid"] + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode, + self.collection_record_file.inode) + self.inodes.update_uuid(self) + self.inodes.inode_cache.update_cache_size(self) + self.fresh() + def uuid(self): return self.collection_locator @use_counter - def update(self, to_record_version=None): + def update(self): try: - if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator): + if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator): + # It's immutable, nothing to update return True if self.collection_locator is None: + # No collection locator to retrieve from self.fresh() return True + new_collection_record = None try: with llfuse.lock_released: self._updating_lock.acquire() if not self.stale(): - return + return True - _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) - new_collection_record = None + _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode) + coll_reader = None if self.collection is not None: - if self.collection.known_past_version(to_record_version): - _logger.debug("%s already processed %s", self.collection_locator, to_record_version) - else: - self.collection.update() + # Already have a collection object + self.collection.update() + new_collection_record = self.collection.api_response() else: + # Create a new collection object if uuid_pattern.match(self.collection_locator): coll_reader = arvados.collection.Collection( self.collection_locator, self.api, self.api.keep, @@ -534,14 +586,13 @@ class CollectionDirectory(CollectionDirectoryBase): new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired() # end with llfuse.lock_released, re-acquire lock - if (new_collection_record is not None and - (self.collection_record is None or - self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))): - self.new_collection(new_collection_record, coll_reader) - self._manifest_size = len(coll_reader.manifest_text()) - _logger.debug("%s manifest_size %i", self, self._manifest_size) - self.fresh() + if new_collection_record is not None: + if coll_reader is not None: + self.new_collection(new_collection_record, coll_reader) + else: + self.new_collection_record(new_collection_record) + return True finally: self._updating_lock.release() @@ -549,22 +600,30 @@ class CollectionDirectory(CollectionDirectoryBase): _logger.error("Error fetching collection '%s': %s", self.collection_locator, e) except arvados.errors.ArgumentError as detail: _logger.warning("arv-mount %s: error %s", self.collection_locator, detail) - if self.collection_record is not None and "manifest_text" in self.collection_record: - _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + if new_collection_record is not None and "manifest_text" in new_collection_record: + _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"]) except Exception: _logger.exception("arv-mount %s: error", self.collection_locator) - if self.collection_record is not None and "manifest_text" in self.collection_record: - _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + if new_collection_record is not None and "manifest_text" in new_collection_record: + _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"]) self.invalidate() return False + @use_counter + @check_update + def collection_record(self): + self.flush() + return self.collection.api_response() + @use_counter @check_update def __getitem__(self, item): if item == '.arvados#collection': if self.collection_record_file is None: - self.collection_record_file = ObjectFile(self.inode, self.collection_record) + self.collection_record_file = FuncToJSONFile( + self.inode, self.collection_record) self.inodes.add_entry(self.collection_record_file) + self.invalidate() # use lookup as a signal to force update return self.collection_record_file else: return super(CollectionDirectory, self).__getitem__(item) @@ -576,30 +635,41 @@ class CollectionDirectory(CollectionDirectoryBase): return super(CollectionDirectory, self).__contains__(k) def invalidate(self): - self.collection_record = None - self.collection_record_file = None + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) super(CollectionDirectory, self).invalidate() def persisted(self): return (self.collection_locator is not None) def objsize(self): - # This is an empirically-derived heuristic to estimate the memory used - # to store this collection's metadata. Calculating the memory - # footprint directly would be more accurate, but also more complicated. - return self._manifest_size * 128 + # This is a rough guess of the amount of overhead involved for + # a collection; the assumptions are that that each file + # averages 128 bytes in the manifest, but consume 1024 bytes + # of Python data structures, so 1024/128=8 means we estimate + # the RAM footprint at 8 times the size of bare manifest text. + return self._manifest_size * 8 def finalize(self): - if self.collection is not None: - if self.writable(): + if self.collection is None: + return + + if self.writable(): + try: self.collection.save() - self.collection.stop_threads() + except Exception as e: + _logger.exception("Failed to save collection %s", self.collection_locator) + self.collection.stop_threads() def clear(self): if self.collection is not None: self.collection.stop_threads() - super(CollectionDirectory, self).clear() self._manifest_size = 0 + super(CollectionDirectory, self).clear() + if self.collection_record_file is not None: + self.inodes.del_entry(self.collection_record_file) + self.collection_record_file = None class TmpCollectionDirectory(CollectionDirectoryBase): @@ -617,7 +687,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def save_new(self): pass - def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None): + def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None): collection = self.UnsaveableCollection( api_client=api_client, keep_client=api_client.keep, @@ -626,33 +696,33 @@ class TmpCollectionDirectory(CollectionDirectoryBase): # This is always enable_write=True because it never tries to # save to the backend super(TmpCollectionDirectory, self).__init__( - parent_inode, inodes, api_client.config, True, collection) - self.collection_record_file = None + parent_inode, inodes, True, filters, collection, self) self.populate(self.mtime()) def on_event(self, *args, **kwargs): super(TmpCollectionDirectory, self).on_event(*args, **kwargs) - if self.collection_record_file: + if self.collection_record_file is None: + return - # See discussion in CollectionDirectoryBase.on_event - lockcount = 0 - try: - while True: - self.collection.lock.release() - lockcount += 1 - except RuntimeError: - pass + # See discussion in CollectionDirectoryBase.on_event + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass - try: - with llfuse.lock: - with self.collection.lock: - self.collection_record_file.invalidate() - self.inodes.invalidate_inode(self.collection_record_file) - _logger.debug("%s invalidated collection record", self) - finally: - while lockcount > 0: - self.collection.lock.acquire() - lockcount -= 1 + try: + with llfuse.lock: + with self.collection.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self.inode) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def collection_record(self): with llfuse.lock_released: @@ -683,6 +753,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def writable(self): return True + def flush(self): + pass + def want_event_subscribe(self): return False @@ -719,8 +792,8 @@ and the directory will appear if it exists. """.lstrip() - def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None): - super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None): + super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.pdh_only = pdh_only @@ -736,8 +809,14 @@ and the directory will appear if it exists. # If we're the root directory, add an identical by_id subdirectory. if self.inode == llfuse.ROOT_INODE: self._entries['by_id'] = self.inodes.add_entry(MagicDirectory( - self.inode, self.inodes, self.api, self.num_retries, self._enable_write, - self.pdh_only)) + self.inode, + self.inodes, + self.api, + self.num_retries, + self._enable_write, + self._filters, + self.pdh_only, + )) def __contains__(self, k): if k in self._entries: @@ -751,15 +830,34 @@ and the directory will appear if it exists. if group_uuid_pattern.match(k): project = self.api.groups().list( - filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries) + filters=[ + ['group_class', 'in', ['project','filter']], + ["uuid", "=", k], + *self._filters_for('groups', qualified=False), + ], + ).execute(num_retries=self.num_retries) if project[u'items_available'] == 0: return False e = self.inodes.add_entry(ProjectDirectory( - self.inode, self.inodes, self.api, self.num_retries, self._enable_write, - project[u'items'][0], storage_classes=self.storage_classes)) + self.inode, + self.inodes, + self.api, + self.num_retries, + self._enable_write, + self._filters, + project[u'items'][0], + storage_classes=self.storage_classes, + )) else: e = self.inodes.add_entry(CollectionDirectory( - self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k)) + self.inode, + self.inodes, + self.api, + self.num_retries, + self._enable_write, + self._filters, + k, + )) if e.update(): if k not in self._entries: @@ -793,8 +891,8 @@ and the directory will appear if it exists. class TagsDirectory(Directory): """A special directory that contains as subdirectories all tags visible to the user.""" - def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60): - super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60): + super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self._poll = True @@ -808,15 +906,32 @@ class TagsDirectory(Directory): def update(self): with llfuse.lock_released: tags = self.api.links().list( - filters=[['link_class', '=', 'tag'], ["name", "!=", ""]], - select=['name'], distinct=True, limit=1000 - ).execute(num_retries=self.num_retries) + filters=[ + ['link_class', '=', 'tag'], + ['name', '!=', ''], + *self._filters_for('links', qualified=False), + ], + select=['name'], + distinct=True, + limit=1000, + ).execute(num_retries=self.num_retries) if "items" in tags: - self.merge(tags['items']+[{"name": n} for n in self._extra], - lambda i: i['name'], - lambda a, i: a.tag == i['name'], - lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, - i['name'], poll=self._poll, poll_time=self._poll_time)) + self.merge( + tags['items']+[{"name": n} for n in self._extra], + lambda i: i['name'], + lambda a, i: a.tag == i['name'], + lambda i: TagDirectory( + self.inode, + self.inodes, + self.api, + self.num_retries, + self._enable_write, + self._filters, + i['name'], + poll=self._poll, + poll_time=self._poll_time, + ), + ) @use_counter @check_update @@ -825,7 +940,12 @@ class TagsDirectory(Directory): return super(TagsDirectory, self).__getitem__(item) with llfuse.lock_released: tags = self.api.links().list( - filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1 + filters=[ + ['link_class', '=', 'tag'], + ['name', '=', item], + *self._filters_for('links', qualified=False), + ], + limit=1, ).execute(num_retries=self.num_retries) if tags["items"]: self._extra.add(item) @@ -850,9 +970,9 @@ class TagDirectory(Directory): to the user that are tagged with a particular tag. """ - def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag, + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag, poll=False, poll_time=60): - super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) + super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.tag = tag @@ -866,23 +986,40 @@ class TagDirectory(Directory): def update(self): with llfuse.lock_released: taggedcollections = self.api.links().list( - filters=[['link_class', '=', 'tag'], - ['name', '=', self.tag], - ['head_uuid', 'is_a', 'arvados#collection']], - select=['head_uuid'] - ).execute(num_retries=self.num_retries) - self.merge(taggedcollections['items'], - lambda i: i['head_uuid'], - lambda a, i: a.collection_locator == i['head_uuid'], - lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])) + filters=[ + ['link_class', '=', 'tag'], + ['name', '=', self.tag], + ['head_uuid', 'is_a', 'arvados#collection'], + *self._filters_for('links', qualified=False), + ], + select=['head_uuid'], + ).execute(num_retries=self.num_retries) + self.merge( + taggedcollections['items'], + lambda i: i['head_uuid'], + lambda a, i: a.collection_locator == i['head_uuid'], + lambda i: CollectionDirectory( + self.inode, + self.inodes, + self.api, + self.num_retries, + self._enable_write, + self._filters, + i['head_uuid'], + ), + ) class ProjectDirectory(Directory): """A special directory that contains the contents of a project.""" - def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object, - poll=True, poll_time=3, storage_classes=None): - super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) + __slots__ = ("api", "num_retries", "project_object", "project_object_file", + "project_uuid", "_updating_lock", + "_current_user", "_full_listing", "storage_classes", "recursively_contained") + + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, + project_object, poll=True, poll_time=3, storage_classes=None): + super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.project_object = project_object @@ -894,19 +1031,32 @@ class ProjectDirectory(Directory): self._current_user = None self._full_listing = False self.storage_classes = storage_classes + self.recursively_contained = False + + # Filter groups can contain themselves, which causes tools + # that walk the filesystem to get stuck in an infinite loop, + # so suppress returning a listing in that case. + if self.project_object.get("group_class") == "filter": + iter_parent_inode = parent_inode + while iter_parent_inode != llfuse.ROOT_INODE: + parent_dir = self.inodes[iter_parent_inode] + if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid: + self.recursively_contained = True + break + iter_parent_inode = parent_dir.parent_inode def want_event_subscribe(self): return True def createDirectory(self, i): + common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters) if collection_uuid_pattern.match(i['uuid']): - return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i) + return CollectionDirectory(*common_args, i) elif group_uuid_pattern.match(i['uuid']): - return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, - i, self._poll, self._poll_time, self.storage_classes) + return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes) elif link_uuid_pattern.match(i['uuid']): if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']): - return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']) + return CollectionDirectory(*common_args, i['head_uuid']) else: return None elif uuid_pattern.match(i['uuid']): @@ -944,7 +1094,7 @@ class ProjectDirectory(Directory): self.project_object_file = ObjectFile(self.inode, self.project_object) self.inodes.add_entry(self.project_object_file) - if not self._full_listing: + if self.recursively_contained or not self._full_listing: return True def samefn(a, i): @@ -967,18 +1117,27 @@ class ProjectDirectory(Directory): self.project_object = self.api.users().get( uuid=self.project_uuid).execute(num_retries=self.num_retries) # do this in 2 steps until #17424 is fixed - contents = list(arvados.util.keyset_list_all(self.api.groups().contents, - order_key="uuid", - num_retries=self.num_retries, - uuid=self.project_uuid, - filters=[["uuid", "is_a", "arvados#group"], - ["groups.group_class", "in", ["project","filter"]]])) - contents.extend(arvados.util.keyset_list_all(self.api.groups().contents, - order_key="uuid", - num_retries=self.num_retries, - uuid=self.project_uuid, - filters=[["uuid", "is_a", "arvados#collection"]])) - + contents = list(arvados.util.keyset_list_all( + self.api.groups().contents, + order_key='uuid', + num_retries=self.num_retries, + uuid=self.project_uuid, + filters=[ + ['uuid', 'is_a', 'arvados#group'], + ['groups.group_class', 'in', ['project', 'filter']], + *self._filters_for('groups', qualified=True), + ], + )) + contents.extend(obj for obj in arvados.util.keyset_list_all( + self.api.groups().contents, + order_key='uuid', + num_retries=self.num_retries, + uuid=self.project_uuid, + filters=[ + ['uuid', 'is_a', 'arvados#collection'], + *self._filters_for('collections', qualified=True), + ], + ) if obj['current_version_uuid'] == obj['uuid']) # end with llfuse.lock_released, re-acquire lock self.merge(contents, @@ -1007,14 +1166,24 @@ class ProjectDirectory(Directory): namefilter = ["name", "=", k] else: namefilter = ["name", "in", [k, k2]] - contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid], - ["group_class", "in", ["project","filter"]], - namefilter], - limit=2).execute(num_retries=self.num_retries)["items"] + contents = self.api.groups().list( + filters=[ + ["owner_uuid", "=", self.project_uuid], + ["group_class", "in", ["project","filter"]], + namefilter, + *self._filters_for('groups', qualified=False), + ], + limit=2, + ).execute(num_retries=self.num_retries)["items"] if not contents: - contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid], - namefilter], - limit=2).execute(num_retries=self.num_retries)["items"] + contents = self.api.collections().list( + filters=[ + ["owner_uuid", "=", self.project_uuid], + namefilter, + *self._filters_for('collections', qualified=False), + ], + limit=2, + ).execute(num_retries=self.num_retries)["items"] if contents: if len(contents) > 1 and contents[1]['name'] == k: # If "foo/bar" and "foo[SUBST]bar" both exist, use @@ -1051,6 +1220,12 @@ class ProjectDirectory(Directory): def persisted(self): return True + def clear(self): + super(ProjectDirectory, self).clear() + if self.project_object_file is not None: + self.inodes.del_entry(self.project_object_file) + self.project_object_file = None + @use_counter @check_update def mkdir(self, name): @@ -1168,9 +1343,9 @@ class ProjectDirectory(Directory): class SharedDirectory(Directory): """A special directory that represents users or groups who have shared projects with me.""" - def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude, - poll=False, poll_time=60, storage_classes=None): - super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, + exclude, poll=False, poll_time=60, storage_classes=None): + super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters) self.api = api self.num_retries = num_retries self.current_user = api.users().current().execute(num_retries=num_retries) @@ -1196,11 +1371,17 @@ class SharedDirectory(Directory): if 'httpMethod' in methods.get('shared', {}): page = [] while True: - resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page, - order="uuid", - limit=10000, - count="none", - include="owner_uuid").execute() + resp = self.api.groups().shared( + filters=[ + ['group_class', 'in', ['project','filter']], + *page, + *self._filters_for('groups', qualified=False), + ], + order="uuid", + limit=10000, + count="none", + include="owner_uuid", + ).execute() if not resp["items"]: break page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]] @@ -1215,8 +1396,12 @@ class SharedDirectory(Directory): self.api.groups().list, order_key="uuid", num_retries=self.num_retries, - filters=[['group_class','in',['project','filter']]], - select=["uuid", "owner_uuid"])) + filters=[ + ['group_class', 'in', ['project','filter']], + *self._filters_for('groups', qualified=False), + ], + select=["uuid", "owner_uuid"], + )) for ob in all_projects: objects[ob['uuid']] = ob @@ -1230,13 +1415,20 @@ class SharedDirectory(Directory): self.api.users().list, order_key="uuid", num_retries=self.num_retries, - filters=[['uuid','in', list(root_owners)]]) + filters=[ + ['uuid', 'in', list(root_owners)], + *self._filters_for('users', qualified=False), + ], + ) lgroups = arvados.util.keyset_list_all( self.api.groups().list, order_key="uuid", num_retries=self.num_retries, - filters=[['uuid','in', list(root_owners)+roots]]) - + filters=[ + ['uuid', 'in', list(root_owners)+roots], + *self._filters_for('groups', qualified=False), + ], + ) for l in lusers: objects[l["uuid"]] = l for l in lgroups: @@ -1258,11 +1450,23 @@ class SharedDirectory(Directory): # end with llfuse.lock_released, re-acquire lock - self.merge(contents.items(), - lambda i: i[0], - lambda a, i: a.uuid() == i[1]['uuid'], - lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, - i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes)) + self.merge( + contents.items(), + lambda i: i[0], + lambda a, i: a.uuid() == i[1]['uuid'], + lambda i: ProjectDirectory( + self.inode, + self.inodes, + self.api, + self.num_retries, + self._enable_write, + self._filters, + i[1], + poll=self._poll, + poll_time=self._poll_time, + storage_classes=self.storage_classes, + ), + ) except Exception: _logger.exception("arv-mount shared dir error") finally: