X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/08acb72dd180391007554783a08d9213e5d6d6c0..1efba8f3b728a3b8aa3c64c5aa09f441318ff2a8:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index 3c262fa96c..8faf01cb6c 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -26,7 +26,7 @@ _logger = logging.getLogger('arvados.arvados_fuse') # Match any character which FUSE or Linux cannot accommodate as part # of a filename. (If present in a collection filename, they will # appear as underscores in the fuse mount.) -_disallowed_filename_characters = re.compile('[\x00/]') +_disallowed_filename_characters = re.compile(r'[\x00/]') class Directory(FreshBase): @@ -270,11 +270,12 @@ class CollectionDirectoryBase(Directory): """ - def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collectionRoot): + def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root): super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write) self.apiconfig = apiconfig self.collection = collection self.collection_root = collection_root + self.collection_record_file = None def new_entry(self, name, item, mtime): name = self.sanitize_filename(name) @@ -293,6 +294,10 @@ class CollectionDirectoryBase(Directory): item.fuse_entry = self._entries[name] def on_event(self, event, collection, name, item): + # These are events from the Collection object (ADD/DEL/MOD) + # emitted by operations on the Collection object (like + # "mkdirs" or "remove"), and by "update", which we need to + # synchronize with our FUSE objects that are assigned inodes. if collection == self.collection: name = self.sanitize_filename(name) @@ -337,6 +342,10 @@ class CollectionDirectoryBase(Directory): self.inodes.invalidate_inode(item.fuse_entry) elif name in self._entries: self.inodes.invalidate_inode(self._entries[name]) + + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) finally: while lockcount > 0: self.collection.lock.acquire() @@ -429,7 +438,6 @@ class CollectionDirectory(CollectionDirectoryBase): super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self) self.api = api self.num_retries = num_retries - self.collection_record_file = None self._poll = True try: self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2) @@ -459,22 +467,16 @@ class CollectionDirectory(CollectionDirectoryBase): if not self.writable(): return with llfuse.lock_released: - self.collection.save() - self.new_collection_record(self.collection.api_response()) + with self._updating_lock: + if self.collection.committed(): + self.collection.update() + else: + self.collection.save() + self.new_collection_record(self.collection.api_response()) def want_event_subscribe(self): return (uuid_pattern.match(self.collection_locator) is not None) - # Used by arv-web.py to switch the contents of the CollectionDirectory - def change_collection(self, new_locator): - """Switch the contents of the CollectionDirectory. - - Must be called with llfuse.lock held. - """ - - self.collection_locator = new_locator - self.update() - def new_collection(self, new_collection_record, coll_reader): if self.inode: self.clear() @@ -484,21 +486,21 @@ class CollectionDirectory(CollectionDirectoryBase): def new_collection_record(self, new_collection_record): if not new_collection_record: - self.collection_record_file = None - self._mtime = 0 - return + raise Exception("invalid new_collection_record") self._mtime = convertTime(new_collection_record.get('modified_at')) - self._manifest_size = len(self.collection.manifest_text()) + self._manifest_size = len(new_collection_record["manifest_text"]) self.collection_locator = new_collection_record["uuid"] if self.collection_record_file is not None: + self.collection_record_file.invalidate() self.inodes.invalidate_inode(self.collection_record_file) _logger.debug("%s invalidated collection record file", self) + self.fresh() def uuid(self): return self.collection_locator @use_counter - def update(self, to_record_version=None): + def update(self): try: if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator): # It's immutable, nothing to update @@ -514,17 +516,14 @@ class CollectionDirectory(CollectionDirectoryBase): with llfuse.lock_released: self._updating_lock.acquire() if not self.stale(): - return + return True - _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) + _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode) coll_reader = None if self.collection is not None: # Already have a collection object - if self.collection.known_past_version(to_record_version): - _logger.debug("%s already processed %s", self.collection_locator, to_record_version) - else: - self.collection.update() - new_collection_record = self.collection.api_response() + self.collection.update() + new_collection_record = self.collection.api_response() else: # Create a new collection object if uuid_pattern.match(self.collection_locator): @@ -553,9 +552,8 @@ class CollectionDirectory(CollectionDirectoryBase): if coll_reader is not None: self.new_collection(new_collection_record, coll_reader) else: - self.new_collection_record(new_collection_record, coll_reader) + self.new_collection_record(new_collection_record) - self.fresh() return True finally: self._updating_lock.release() @@ -568,13 +566,13 @@ class CollectionDirectory(CollectionDirectoryBase): except Exception: _logger.exception("arv-mount %s: error", self.collection_locator) if new_collection_record is not None and "manifest_text" in new_collection_record: - _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"]) self.invalidate() return False @use_counter - @check_update def collection_record(self): + self.flush() return self.collection.api_response() @use_counter @@ -585,6 +583,7 @@ class CollectionDirectory(CollectionDirectoryBase): self.collection_record_file = FuncToJSONFile( self.inode, self.collection_record) self.inodes.add_entry(self.collection_record_file) + self.invalidate() # use lookup as a signal to force update return self.collection_record_file else: return super(CollectionDirectory, self).__getitem__(item) @@ -597,6 +596,7 @@ class CollectionDirectory(CollectionDirectoryBase): def invalidate(self): if self.collection_record_file is not None: + self.collection_record_file.invalidate() self.inodes.invalidate_inode(self.collection_record_file) super(CollectionDirectory, self).invalidate() @@ -646,10 +646,33 @@ class TmpCollectionDirectory(CollectionDirectoryBase): # This is always enable_write=True because it never tries to # save to the backend super(TmpCollectionDirectory, self).__init__( - parent_inode, inodes, api_client.config, True, collection) - self.collection_record_file = None + parent_inode, inodes, api_client.config, True, collection, self) self.populate(self.mtime()) + def on_event(self, *args, **kwargs): + super(TmpCollectionDirectory, self).on_event(*args, **kwargs) + if self.collection_record_file is None: + return + + # See discussion in CollectionDirectoryBase.on_event + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass + + try: + with llfuse.lock: + with self.collection.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def collection_record(self): with llfuse.lock_released: @@ -680,6 +703,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def writable(self): return True + def flush(self): + pass + def want_event_subscribe(self): return False