Merge branch '17995-filter-by-comparing-attrs'
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 78cbd0d8cfd06f1c638549151c56e74a32025237..d5a018ae88fcd859adc3047ad2384732a0bfbe92 100644 (file)
@@ -2,11 +2,6 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-from __future__ import absolute_import
-from __future__ import division
-from future.utils import viewitems
-from future.utils import itervalues
-from builtins import dict
 import apiclient
 import arvados
 import errno
@@ -196,7 +191,7 @@ class Directory(FreshBase):
     def in_use(self):
         if super(Directory, self).in_use():
             return True
-        for v in itervalues(self._entries):
+        for v in self._entries.values():
             if v.in_use():
                 return True
         return False
@@ -204,7 +199,7 @@ class Directory(FreshBase):
     def has_ref(self, only_children):
         if super(Directory, self).has_ref(only_children):
             return True
-        for v in itervalues(self._entries):
+        for v in self._entries.values():
             if v.has_ref(False):
                 return True
         return False
@@ -226,7 +221,7 @@ class Directory(FreshBase):
         # Find self on the parent in order to invalidate this path.
         # Calling the public items() method might trigger a refresh,
         # which we definitely don't want, so read the internal dict directly.
-        for k,v in viewitems(parent._entries):
+        for k,v in parent._entries.items():
             if v is self:
                 self.inodes.invalidate_entry(parent, k)
                 break
@@ -298,26 +293,59 @@ class CollectionDirectoryBase(Directory):
     def on_event(self, event, collection, name, item):
         if collection == self.collection:
             name = self.sanitize_filename(name)
-            _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
-            with llfuse.lock:
-                if event == arvados.collection.ADD:
-                    self.new_entry(name, item, self.mtime())
-                elif event == arvados.collection.DEL:
-                    ent = self._entries[name]
-                    del self._entries[name]
-                    self.inodes.invalidate_entry(self, name)
-                    self.inodes.del_entry(ent)
-                elif event == arvados.collection.MOD:
-                    if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
-                        self.inodes.invalidate_inode(item.fuse_entry)
-                    elif name in self._entries:
-                        self.inodes.invalidate_inode(self._entries[name])
+
+            #
+            # It's possible for another thread to have llfuse.lock and
+            # be waiting on collection.lock.  Meanwhile, we released
+            # llfuse.lock earlier in the stack, but are still holding
+            # on to the collection lock, and now we need to re-acquire
+            # llfuse.lock.  If we don't release the collection lock,
+            # we'll deadlock where we're holding the collection lock
+            # waiting for llfuse.lock and the other thread is holding
+            # llfuse.lock and waiting for the collection lock.
+            #
+            # The correct locking order here is to take llfuse.lock
+            # first, then the collection lock.
+            #
+            # Since collection.lock is an RLock, it might be locked
+            # multiple times, so we need to release it multiple times,
+            # keep a count, then re-lock it the correct number of
+            # times.
+            #
+            lockcount = 0
+            try:
+                while True:
+                    self.collection.lock.release()
+                    lockcount += 1
+            except RuntimeError:
+                pass
+
+            try:
+                with llfuse.lock:
+                    with self.collection.lock:
+                        if event == arvados.collection.ADD:
+                            self.new_entry(name, item, self.mtime())
+                        elif event == arvados.collection.DEL:
+                            ent = self._entries[name]
+                            del self._entries[name]
+                            self.inodes.invalidate_entry(self, name)
+                            self.inodes.del_entry(ent)
+                        elif event == arvados.collection.MOD:
+                            if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+                                self.inodes.invalidate_inode(item.fuse_entry)
+                            elif name in self._entries:
+                                self.inodes.invalidate_inode(self._entries[name])
+            finally:
+                while lockcount > 0:
+                    self.collection.lock.acquire()
+                    lockcount -= 1
 
     def populate(self, mtime):
         self._mtime = mtime
-        self.collection.subscribe(self.on_event)
-        for entry, item in viewitems(self.collection):
-            self.new_entry(entry, item, self.mtime())
+        with self.collection.lock:
+            self.collection.subscribe(self.on_event)
+            for entry, item in self.collection.items():
+                self.new_entry(entry, item, self.mtime())
 
     def writable(self):
         return self.collection.writable()
@@ -464,6 +492,7 @@ class CollectionDirectory(CollectionDirectoryBase):
                         return
 
                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
+                    new_collection_record = None
                     if self.collection is not None:
                         if self.collection.known_past_version(to_record_version):
                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
@@ -490,12 +519,13 @@ class CollectionDirectory(CollectionDirectoryBase):
                         if 'storage_classes_desired' not in new_collection_record:
                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
 
-                        if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
-                            self.new_collection(new_collection_record, coll_reader)
-
-                        self._manifest_size = len(coll_reader.manifest_text())
-                        _logger.debug("%s manifest_size %i", self, self._manifest_size)
                 # end with llfuse.lock_released, re-acquire lock
+                if (new_collection_record is not None and
+                    (self.collection_record is None or
+                     self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
+                    self.new_collection(new_collection_record, coll_reader)
+                    self._manifest_size = len(coll_reader.manifest_text())
+                    _logger.debug("%s manifest_size %i", self, self._manifest_size)
 
                 self.fresh()
                 return True
@@ -587,10 +617,26 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def on_event(self, *args, **kwargs):
         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
         if self.collection_record_file:
-            with llfuse.lock:
-                self.collection_record_file.invalidate()
-            self.inodes.invalidate_inode(self.collection_record_file)
-            _logger.debug("%s invalidated collection record", self)
+
+            # See discussion in CollectionDirectoryBase.on_event
+            lockcount = 0
+            try:
+                while True:
+                    self.collection.lock.release()
+                    lockcount += 1
+            except RuntimeError:
+                pass
+
+            try:
+                with llfuse.lock:
+                    with self.collection.lock:
+                        self.collection_record_file.invalidate()
+                        self.inodes.invalidate_inode(self.collection_record_file)
+                        _logger.debug("%s invalidated collection record", self)
+            finally:
+                while lockcount > 0:
+                    self.collection.lock.acquire()
+                    lockcount -= 1
 
     def collection_record(self):
         with llfuse.lock_released:
@@ -1182,7 +1228,7 @@ class SharedDirectory(Directory):
 
             # end with llfuse.lock_released, re-acquire lock
 
-            self.merge(viewitems(contents),
+            self.merge(contents.items(),
                        lambda i: i[0],
                        lambda a, i: a.uuid() == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))