X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b63c1041dd34fe496920fd035249e50edc88b095..24696c5a7411f66b2b1b1a677c60907629f209e9:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index 196bb221e9..becd66975f 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + import logging import re import time @@ -146,34 +150,53 @@ class Directory(FreshBase): # delete any other directory entries that were not in found in 'items' for i in oldentries: _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode) - self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding)) + self.inodes.invalidate_entry(self, i) self.inodes.del_entry(oldentries[i]) changed = True if changed: - self.inodes.invalidate_inode(self.inode) + self.inodes.invalidate_inode(self) self._mtime = time.time() self.fresh() - def clear(self, force=False): - """Delete all entries""" + def in_use(self): + if super(Directory, self).in_use(): + return True + for v in self._entries.itervalues(): + if v.in_use(): + return True + return False - if not self.in_use() or force: - oldentries = self._entries - self._entries = {} - for n in oldentries: - if not oldentries[n].clear(force): - self._entries = oldentries - return False - for n in oldentries: - self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) - self.inodes.del_entry(oldentries[n]) - self.inodes.invalidate_inode(self.inode) - self.invalidate() + def has_ref(self, only_children): + if super(Directory, self).has_ref(only_children): return True - else: - return False + for v in self._entries.itervalues(): + if v.has_ref(False): + return True + return False + + def clear(self): + """Delete all entries""" + oldentries = self._entries + self._entries = {} + for n in oldentries: + oldentries[n].clear() + self.inodes.del_entry(oldentries[n]) + self.invalidate() + + def kernel_invalidate(self): + # Invalidating the dentry on the parent implies invalidating all paths + # below it as well. + parent = self.inodes[self.parent_inode] + + # Find self on the parent in order to invalidate this path. + # Calling the public items() method might trigger a refresh, + # which we definitely don't want, so read the internal dict directly. + for k,v in parent._entries.items(): + if v is self: + self.inodes.invalidate_entry(parent, k) + break def mtime(self): return self._mtime @@ -248,13 +271,13 @@ class CollectionDirectoryBase(Directory): elif event == arvados.collection.DEL: ent = self._entries[name] del self._entries[name] - self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding)) + self.inodes.invalidate_entry(self, name) self.inodes.del_entry(ent) elif event == arvados.collection.MOD: if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - self.inodes.invalidate_inode(item.fuse_entry.inode) + self.inodes.invalidate_inode(item.fuse_entry) elif name in self._entries: - self.inodes.invalidate_inode(self._entries[name].inode) + self.inodes.invalidate_inode(self._entries[name]) def populate(self, mtime): self._mtime = mtime @@ -320,6 +343,10 @@ class CollectionDirectoryBase(Directory): self.flush() src.flush() + def clear(self): + super(CollectionDirectoryBase, self).clear() + self.collection = None + class CollectionDirectory(CollectionDirectoryBase): """Represents the root of a directory tree representing a collection.""" @@ -370,7 +397,7 @@ class CollectionDirectory(CollectionDirectoryBase): def new_collection(self, new_collection_record, coll_reader): if self.inode: - self.clear(force=True) + self.clear() self.collection_record = new_collection_record @@ -402,7 +429,7 @@ class CollectionDirectory(CollectionDirectoryBase): if not self.stale(): return - _logger.debug("Updating %s", to_record_version) + _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) if self.collection is not None: if self.collection.known_past_version(to_record_version): _logger.debug("%s already processed %s", self.collection_locator, to_record_version) @@ -488,6 +515,12 @@ class CollectionDirectory(CollectionDirectoryBase): self.collection.save() self.collection.stop_threads() + def clear(self): + if self.collection is not None: + self.collection.stop_threads() + super(CollectionDirectory, self).clear() + self._manifest_size = 0 + class TmpCollectionDirectory(CollectionDirectoryBase): """A directory backed by an Arvados collection that never gets saved. @@ -519,7 +552,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase): if self.collection_record_file: with llfuse.lock: self.collection_record_file.invalidate() - self.inodes.invalidate_inode(self.collection_record_file.inode) + self.inodes.invalidate_inode(self.collection_record_file) _logger.debug("%s invalidated collection record", self) def collection_record(self): @@ -611,6 +644,7 @@ will appear if it exists. return False try: + e = None e = self.inodes.add_entry(CollectionDirectory( self.inode, self.inodes, self.api, self.num_retries, k)) @@ -621,12 +655,13 @@ will appear if it exists. self.inodes.del_entry(e) return True else: - self.inodes.invalidate_entry(self.inode, k) + self.inodes.invalidate_entry(self, k) self.inodes.del_entry(e) return False except Exception as ex: - _logger.debug('arv-mount exception keep %s', ex) - self.inodes.del_entry(e) + _logger.exception("arv-mount lookup '%s':", k) + if e is not None: + self.inodes.del_entry(e) return False def __getitem__(self, item): @@ -635,24 +670,14 @@ will appear if it exists. else: raise KeyError("No collection with id " + item) - def clear(self, force=False): + def clear(self): pass def want_event_subscribe(self): return not self.pdh_only -class RecursiveInvalidateDirectory(Directory): - def invalidate(self): - try: - super(RecursiveInvalidateDirectory, self).invalidate() - for a in self._entries: - self._entries[a].invalidate() - except Exception: - _logger.exception() - - -class TagsDirectory(RecursiveInvalidateDirectory): +class TagsDirectory(Directory): """A special directory that contains as subdirectories all tags visible to the user.""" def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60): @@ -661,6 +686,7 @@ class TagsDirectory(RecursiveInvalidateDirectory): self.num_retries = num_retries self._poll = True self._poll_time = poll_time + self._extra = set() def want_event_subscribe(self): return True @@ -669,15 +695,41 @@ class TagsDirectory(RecursiveInvalidateDirectory): def update(self): with llfuse.lock_released: tags = self.api.links().list( - filters=[['link_class', '=', 'tag']], - select=['name'], distinct=True + filters=[['link_class', '=', 'tag'], ["name", "!=", ""]], + select=['name'], distinct=True, limit=1000 ).execute(num_retries=self.num_retries) if "items" in tags: - self.merge(tags['items'], + self.merge(tags['items']+[{"name": n} for n in self._extra], lambda i: i['name'], lambda a, i: a.tag == i['name'], lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time)) + @use_counter + @check_update + def __getitem__(self, item): + if super(TagsDirectory, self).__contains__(item): + return super(TagsDirectory, self).__getitem__(item) + with llfuse.lock_released: + tags = self.api.links().list( + filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1 + ).execute(num_retries=self.num_retries) + if tags["items"]: + self._extra.add(item) + self.update() + return super(TagsDirectory, self).__getitem__(item) + + @use_counter + @check_update + def __contains__(self, k): + if super(TagsDirectory, self).__contains__(k): + return True + try: + self[k] + return True + except KeyError: + pass + return False + class TagDirectory(Directory): """A special directory that contains as subdirectories all collections visible @@ -726,6 +778,7 @@ class ProjectDirectory(Directory): self._poll_time = poll_time self._updating_lock = threading.Lock() self._current_user = None + self._full_listing = False def want_event_subscribe(self): return True @@ -748,27 +801,35 @@ class ProjectDirectory(Directory): def uuid(self): return self.project_uuid + def items(self): + self._full_listing = True + return super(ProjectDirectory, self).items() + + def namefn(self, i): + if 'name' in i: + if i['name'] is None or len(i['name']) == 0: + return None + elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])): + # collection or subproject + return i['name'] + elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection': + # name link + return i['name'] + elif 'kind' in i and i['kind'].startswith('arvados#'): + # something else + return "{}.{}".format(i['name'], i['kind'][8:]) + else: + return None + + @use_counter def update(self): if self.project_object_file == None: self.project_object_file = ObjectFile(self.inode, self.project_object) self.inodes.add_entry(self.project_object_file) - def namefn(i): - if 'name' in i: - if i['name'] is None or len(i['name']) == 0: - return None - elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']): - # collection or subproject - return i['name'] - elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection': - # name link - return i['name'] - elif 'kind' in i and i['kind'].startswith('arvados#'): - # something else - return "{}.{}".format(i['name'], i['kind'][8:]) - else: - return None + if not self._full_listing: + return def samefn(a, i): if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory): @@ -790,31 +851,62 @@ class ProjectDirectory(Directory): self.project_object = self.api.users().get( uuid=self.project_uuid).execute(num_retries=self.num_retries) - contents = arvados.util.list_all(self.api.groups().contents, - self.num_retries, uuid=self.project_uuid) + contents = arvados.util.list_all(self.api.groups().list, + self.num_retries, + filters=[["owner_uuid", "=", self.project_uuid], + ["group_class", "=", "project"]]) + contents.extend(arvados.util.list_all(self.api.collections().list, + self.num_retries, + filters=[["owner_uuid", "=", self.project_uuid]])) # end with llfuse.lock_released, re-acquire lock self.merge(contents, - namefn, + self.namefn, samefn, self.createDirectory) finally: self._updating_lock.release() + def _add_entry(self, i, name): + ent = self.createDirectory(i) + self._entries[name] = self.inodes.add_entry(ent) + return self._entries[name] + @use_counter @check_update - def __getitem__(self, item): - if item == '.arvados#project': + def __getitem__(self, k): + if k == '.arvados#project': return self.project_object_file - else: - return super(ProjectDirectory, self).__getitem__(item) + elif self._full_listing or super(ProjectDirectory, self).__contains__(k): + return super(ProjectDirectory, self).__getitem__(k) + with llfuse.lock_released: + contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid], + ["group_class", "=", "project"], + ["name", "=", k]], + limit=1).execute(num_retries=self.num_retries)["items"] + if not contents: + contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid], + ["name", "=", k]], + limit=1).execute(num_retries=self.num_retries)["items"] + if contents: + name = sanitize_filename(self.namefn(contents[0])) + if name != k: + raise KeyError(k) + return self._add_entry(contents[0], name) + + # Didn't find item + raise KeyError(k) def __contains__(self, k): if k == '.arvados#project': return True - else: - return super(ProjectDirectory, self).__contains__(k) + try: + self[k] + return True + except KeyError: + pass + return False @use_counter @check_update @@ -878,7 +970,51 @@ class ProjectDirectory(Directory): # Acually move the entry from source directory to this directory. del src._entries[name_old] self._entries[name_new] = ent - self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding)) + self.inodes.invalidate_entry(src, name_old) + + @use_counter + def child_event(self, ev): + properties = ev.get("properties") or {} + old_attrs = properties.get("old_attributes") or {} + new_attrs = properties.get("new_attributes") or {} + old_attrs["uuid"] = ev["object_uuid"] + new_attrs["uuid"] = ev["object_uuid"] + old_name = sanitize_filename(self.namefn(old_attrs)) + new_name = sanitize_filename(self.namefn(new_attrs)) + + # create events will have a new name, but not an old name + # delete events will have an old name, but not a new name + # update events will have an old and new name, and they may be same or different + # if they are the same, an unrelated field changed and there is nothing to do. + + if old_attrs.get("owner_uuid") != self.project_uuid: + # Was moved from somewhere else, so don't try to remove entry. + old_name = None + if ev.get("object_owner_uuid") != self.project_uuid: + # Was moved to somewhere else, so don't try to add entry + new_name = None + + if old_attrs.get("is_trashed"): + # Was previously deleted + old_name = None + if new_attrs.get("is_trashed"): + # Has been deleted + new_name = None + + if new_name != old_name: + ent = None + if old_name in self._entries: + ent = self._entries[old_name] + del self._entries[old_name] + self.inodes.invalidate_entry(self, old_name) + + if new_name: + if ent is not None: + self._entries[new_name] = ent + else: + self._add_entry(new_attrs, new_name) + elif ent is not None: + self.inodes.del_entry(ent) class SharedDirectory(Directory):