X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d3ffe252f9d7cbbad9a7bf61ccf5d26129720f43..4090574822afd6a7b48ccd277ba84c3cc6244e71:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index a2e33c7b3b..f3816c0d3e 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -270,10 +270,12 @@ class CollectionDirectoryBase(Directory): """ - def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection): + def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root): super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write) self.apiconfig = apiconfig self.collection = collection + self.collection_root = collection_root + self.collection_record_file = None def new_entry(self, name, item, mtime): name = self.sanitize_filename(name) @@ -285,13 +287,17 @@ class CollectionDirectoryBase(Directory): item.fuse_entry.dead = False self._entries[name] = item.fuse_entry elif isinstance(item, arvados.collection.RichCollectionBase): - self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item)) + self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root)) self._entries[name].populate(mtime) else: self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write)) item.fuse_entry = self._entries[name] def on_event(self, event, collection, name, item): + # These are events from the Collection object (ADD/DEL/MOD) + # emitted by operations on the Collection object (like + # "mkdirs" or "remove"), and by "update", which we need to + # synchronize with our FUSE objects that are assigned inodes. if collection == self.collection: name = self.sanitize_filename(name) @@ -336,6 +342,10 @@ class CollectionDirectoryBase(Directory): self.inodes.invalidate_inode(item.fuse_entry) elif name in self._entries: self.inodes.invalidate_inode(self._entries[name]) + + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) finally: while lockcount > 0: self.collection.lock.acquire() @@ -353,10 +363,7 @@ class CollectionDirectoryBase(Directory): @use_counter def flush(self): - if not self.writable(): - return - with llfuse.lock_released: - self.collection.root_collection().save() + self.collection_root.flush() @use_counter @check_update @@ -428,11 +435,9 @@ class CollectionDirectory(CollectionDirectoryBase): """Represents the root of a directory tree representing a collection.""" def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None): - super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None) + super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self) self.api = api self.num_retries = num_retries - self.collection_record_file = None - self.collection_record = None self._poll = True try: self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2) @@ -457,70 +462,86 @@ class CollectionDirectory(CollectionDirectoryBase): def writable(self): return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable) + @use_counter + def flush(self): + if not self.writable(): + return + with llfuse.lock_released: + with self._updating_lock: + if self.collection.committed(): + self.collection.update() + else: + self.collection.save() + self.new_collection_record(self.collection.api_response()) + def want_event_subscribe(self): return (uuid_pattern.match(self.collection_locator) is not None) - # Used by arv-web.py to switch the contents of the CollectionDirectory - def change_collection(self, new_locator): - """Switch the contents of the CollectionDirectory. - - Must be called with llfuse.lock held. - """ - - self.collection_locator = new_locator - self.collection_record = None - self.update() - def new_collection(self, new_collection_record, coll_reader): if self.inode: self.clear() - - self.collection_record = new_collection_record - - if self.collection_record: - self._mtime = convertTime(self.collection_record.get('modified_at')) - self.collection_locator = self.collection_record["uuid"] - if self.collection_record_file is not None: - self.collection_record_file.update(self.collection_record) - self.collection = coll_reader + self.new_collection_record(new_collection_record) self.populate(self.mtime()) + def new_collection_record(self, new_collection_record): + if not new_collection_record: + raise Exception("invalid new_collection_record") + self._mtime = convertTime(new_collection_record.get('modified_at')) + self._manifest_size = len(new_collection_record["manifest_text"]) + self.collection_locator = new_collection_record["uuid"] + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record file", self) + self.fresh() + def uuid(self): return self.collection_locator @use_counter - def update(self, to_record_version=None): + def update(self): try: - if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator): + if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator): + # It's immutable, nothing to update return True if self.collection_locator is None: + # No collection locator to retrieve from self.fresh() return True + new_collection_record = None try: with llfuse.lock_released: self._updating_lock.acquire() if not self.stale(): - return + return True - _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) - new_collection_record = None + _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode) + coll_reader = None if self.collection is not None: - if self.collection.known_past_version(to_record_version): - _logger.debug("%s already processed %s", self.collection_locator, to_record_version) - else: - self.collection.update() + # Already have a collection object + self.collection.update() + new_collection_record = self.collection.api_response() else: + # If there's too many prefetch threads and you + # max out the CPU, delivering data to the FUSE + # layer actually ends up being slower. + # Experimentally, capping 7 threads seems to + # be a sweet spot. + get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7) + # Create a new collection object if uuid_pattern.match(self.collection_locator): coll_reader = arvados.collection.Collection( self.collection_locator, self.api, self.api.keep, - num_retries=self.num_retries) + num_retries=self.num_retries, + get_threads=get_threads) else: coll_reader = arvados.collection.CollectionReader( self.collection_locator, self.api, self.api.keep, - num_retries=self.num_retries) + num_retries=self.num_retries, + get_threads=get_threads) new_collection_record = coll_reader.api_response() or {} # If the Collection only exists in Keep, there will be no API # response. Fill in the fields we need. @@ -534,14 +555,13 @@ class CollectionDirectory(CollectionDirectoryBase): new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired() # end with llfuse.lock_released, re-acquire lock - if (new_collection_record is not None and - (self.collection_record is None or - self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))): - self.new_collection(new_collection_record, coll_reader) - self._manifest_size = len(coll_reader.manifest_text()) - _logger.debug("%s manifest_size %i", self, self._manifest_size) - self.fresh() + if new_collection_record is not None: + if coll_reader is not None: + self.new_collection(new_collection_record, coll_reader) + else: + self.new_collection_record(new_collection_record) + return True finally: self._updating_lock.release() @@ -549,22 +569,29 @@ class CollectionDirectory(CollectionDirectoryBase): _logger.error("Error fetching collection '%s': %s", self.collection_locator, e) except arvados.errors.ArgumentError as detail: _logger.warning("arv-mount %s: error %s", self.collection_locator, detail) - if self.collection_record is not None and "manifest_text" in self.collection_record: - _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + if new_collection_record is not None and "manifest_text" in new_collection_record: + _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"]) except Exception: _logger.exception("arv-mount %s: error", self.collection_locator) - if self.collection_record is not None and "manifest_text" in self.collection_record: - _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + if new_collection_record is not None and "manifest_text" in new_collection_record: + _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"]) self.invalidate() return False + @use_counter + def collection_record(self): + self.flush() + return self.collection.api_response() + @use_counter @check_update def __getitem__(self, item): if item == '.arvados#collection': if self.collection_record_file is None: - self.collection_record_file = ObjectFile(self.inode, self.collection_record) + self.collection_record_file = FuncToJSONFile( + self.inode, self.collection_record) self.inodes.add_entry(self.collection_record_file) + self.invalidate() # use lookup as a signal to force update return self.collection_record_file else: return super(CollectionDirectory, self).__getitem__(item) @@ -576,8 +603,9 @@ class CollectionDirectory(CollectionDirectoryBase): return super(CollectionDirectory, self).__contains__(k) def invalidate(self): - self.collection_record = None - self.collection_record_file = None + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) super(CollectionDirectory, self).invalidate() def persisted(self): @@ -626,33 +654,33 @@ class TmpCollectionDirectory(CollectionDirectoryBase): # This is always enable_write=True because it never tries to # save to the backend super(TmpCollectionDirectory, self).__init__( - parent_inode, inodes, api_client.config, True, collection) - self.collection_record_file = None + parent_inode, inodes, api_client.config, True, collection, self) self.populate(self.mtime()) def on_event(self, *args, **kwargs): super(TmpCollectionDirectory, self).on_event(*args, **kwargs) - if self.collection_record_file: + if self.collection_record_file is None: + return - # See discussion in CollectionDirectoryBase.on_event - lockcount = 0 - try: - while True: - self.collection.lock.release() - lockcount += 1 - except RuntimeError: - pass + # See discussion in CollectionDirectoryBase.on_event + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass - try: - with llfuse.lock: - with self.collection.lock: - self.collection_record_file.invalidate() - self.inodes.invalidate_inode(self.collection_record_file) - _logger.debug("%s invalidated collection record", self) - finally: - while lockcount > 0: - self.collection.lock.acquire() - lockcount -= 1 + try: + with llfuse.lock: + with self.collection.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def collection_record(self): with llfuse.lock_released: @@ -683,6 +711,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def writable(self): return True + def flush(self): + pass + def want_event_subscribe(self): return False @@ -973,11 +1004,13 @@ class ProjectDirectory(Directory): uuid=self.project_uuid, filters=[["uuid", "is_a", "arvados#group"], ["groups.group_class", "in", ["project","filter"]]])) - contents.extend(arvados.util.keyset_list_all(self.api.groups().contents, + contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"], + arvados.util.keyset_list_all(self.api.groups().contents, order_key="uuid", num_retries=self.num_retries, uuid=self.project_uuid, - filters=[["uuid", "is_a", "arvados#collection"]])) + filters=[["uuid", "is_a", "arvados#collection"]]))) + # end with llfuse.lock_released, re-acquire lock