X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c381502b54b67080ba76667c6482c3711d3c10a4..7aaf9f22aa646077b4b7fd961d6b731185b88137:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index a5d79c9486..d5a018ae88 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -2,11 +2,6 @@ # # SPDX-License-Identifier: AGPL-3.0 -from __future__ import absolute_import -from __future__ import division -from future.utils import viewitems -from future.utils import itervalues -from builtins import dict import apiclient import arvados import errno @@ -196,7 +191,7 @@ class Directory(FreshBase): def in_use(self): if super(Directory, self).in_use(): return True - for v in itervalues(self._entries): + for v in self._entries.values(): if v.in_use(): return True return False @@ -204,7 +199,7 @@ class Directory(FreshBase): def has_ref(self, only_children): if super(Directory, self).has_ref(only_children): return True - for v in itervalues(self._entries): + for v in self._entries.values(): if v.has_ref(False): return True return False @@ -226,7 +221,7 @@ class Directory(FreshBase): # Find self on the parent in order to invalidate this path. # Calling the public items() method might trigger a refresh, # which we definitely don't want, so read the internal dict directly. - for k,v in viewitems(parent._entries): + for k,v in parent._entries.items(): if v is self: self.inodes.invalidate_entry(parent, k) break @@ -298,26 +293,59 @@ class CollectionDirectoryBase(Directory): def on_event(self, event, collection, name, item): if collection == self.collection: name = self.sanitize_filename(name) - _logger.debug("collection notify %s %s %s %s", event, collection, name, item) - with llfuse.lock: - if event == arvados.collection.ADD: - self.new_entry(name, item, self.mtime()) - elif event == arvados.collection.DEL: - ent = self._entries[name] - del self._entries[name] - self.inodes.invalidate_entry(self, name) - self.inodes.del_entry(ent) - elif event == arvados.collection.MOD: - if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - self.inodes.invalidate_inode(item.fuse_entry) - elif name in self._entries: - self.inodes.invalidate_inode(self._entries[name]) + + # + # It's possible for another thread to have llfuse.lock and + # be waiting on collection.lock. Meanwhile, we released + # llfuse.lock earlier in the stack, but are still holding + # on to the collection lock, and now we need to re-acquire + # llfuse.lock. If we don't release the collection lock, + # we'll deadlock where we're holding the collection lock + # waiting for llfuse.lock and the other thread is holding + # llfuse.lock and waiting for the collection lock. + # + # The correct locking order here is to take llfuse.lock + # first, then the collection lock. + # + # Since collection.lock is an RLock, it might be locked + # multiple times, so we need to release it multiple times, + # keep a count, then re-lock it the correct number of + # times. + # + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass + + try: + with llfuse.lock: + with self.collection.lock: + if event == arvados.collection.ADD: + self.new_entry(name, item, self.mtime()) + elif event == arvados.collection.DEL: + ent = self._entries[name] + del self._entries[name] + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) + elif event == arvados.collection.MOD: + if hasattr(item, "fuse_entry") and item.fuse_entry is not None: + self.inodes.invalidate_inode(item.fuse_entry) + elif name in self._entries: + self.inodes.invalidate_inode(self._entries[name]) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def populate(self, mtime): self._mtime = mtime - self.collection.subscribe(self.on_event) - for entry, item in viewitems(self.collection): - self.new_entry(entry, item, self.mtime()) + with self.collection.lock: + self.collection.subscribe(self.on_event) + for entry, item in self.collection.items(): + self.new_entry(entry, item, self.mtime()) def writable(self): return self.collection.writable() @@ -464,6 +492,7 @@ class CollectionDirectory(CollectionDirectoryBase): return _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) + new_collection_record = None if self.collection is not None: if self.collection.known_past_version(to_record_version): _logger.debug("%s already processed %s", self.collection_locator, to_record_version) @@ -487,13 +516,16 @@ class CollectionDirectory(CollectionDirectoryBase): new_collection_record["portable_data_hash"] = new_collection_record["uuid"] if 'manifest_text' not in new_collection_record: new_collection_record['manifest_text'] = coll_reader.manifest_text() + if 'storage_classes_desired' not in new_collection_record: + new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired() - if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"): - self.new_collection(new_collection_record, coll_reader) - - self._manifest_size = len(coll_reader.manifest_text()) - _logger.debug("%s manifest_size %i", self, self._manifest_size) # end with llfuse.lock_released, re-acquire lock + if (new_collection_record is not None and + (self.collection_record is None or + self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))): + self.new_collection(new_collection_record, coll_reader) + self._manifest_size = len(coll_reader.manifest_text()) + _logger.debug("%s manifest_size %i", self, self._manifest_size) self.fresh() return True @@ -585,10 +617,26 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def on_event(self, *args, **kwargs): super(TmpCollectionDirectory, self).on_event(*args, **kwargs) if self.collection_record_file: - with llfuse.lock: - self.collection_record_file.invalidate() - self.inodes.invalidate_inode(self.collection_record_file) - _logger.debug("%s invalidated collection record", self) + + # See discussion in CollectionDirectoryBase.on_event + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass + + try: + with llfuse.lock: + with self.collection.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def collection_record(self): with llfuse.lock_released: @@ -596,6 +644,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase): "uuid": None, "manifest_text": self.collection.manifest_text(), "portable_data_hash": self.collection.portable_data_hash(), + "storage_classes_desired": self.collection.storage_classes_desired(), } def __contains__(self, k): @@ -1179,7 +1228,7 @@ class SharedDirectory(Directory): # end with llfuse.lock_released, re-acquire lock - self.merge(viewitems(contents), + self.merge(contents.items(), lambda i: i[0], lambda a, i: a.uuid() == i[1]['uuid'], lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))