Merge branch '20637-prefetch-threads' refs #20637
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 704dbe43914826e384e55af12f7fe8a76b95abfd..7de95a0cb1b0d95bd1d67dcc58b5a3c406a863ff 100644 (file)
@@ -275,6 +275,7 @@ class CollectionDirectoryBase(Directory):
         self.apiconfig = apiconfig
         self.collection = collection
         self.collection_root = collection_root
+        self.collection_record_file = None
 
     def new_entry(self, name, item, mtime):
         name = self.sanitize_filename(name)
@@ -341,6 +342,10 @@ class CollectionDirectoryBase(Directory):
                                 self.inodes.invalidate_inode(item.fuse_entry)
                             elif name in self._entries:
                                 self.inodes.invalidate_inode(self._entries[name])
+
+                        if self.collection_record_file is not None:
+                            self.collection_record_file.invalidate()
+                            self.inodes.invalidate_inode(self.collection_record_file)
             finally:
                 while lockcount > 0:
                     self.collection.lock.acquire()
@@ -433,7 +438,6 @@ class CollectionDirectory(CollectionDirectoryBase):
         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
         self.api = api
         self.num_retries = num_retries
-        self.collection_record_file = None
         self._poll = True
         try:
             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
@@ -463,8 +467,12 @@ class CollectionDirectory(CollectionDirectoryBase):
         if not self.writable():
             return
         with llfuse.lock_released:
-            self.collection.save()
-        self.new_collection_record(self.collection.api_response())
+            with self._updating_lock:
+                if self.collection.committed():
+                    self.collection.update()
+                else:
+                    self.collection.save()
+                self.new_collection_record(self.collection.api_response())
 
     def want_event_subscribe(self):
         return (uuid_pattern.match(self.collection_locator) is not None)
@@ -510,7 +518,7 @@ class CollectionDirectory(CollectionDirectoryBase):
                     if not self.stale():
                         return True
 
-                    _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode)
+                    _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
                     coll_reader = None
                     if self.collection is not None:
                         # Already have a collection object
@@ -563,8 +571,8 @@ class CollectionDirectory(CollectionDirectoryBase):
         return False
 
     @use_counter
-    @check_update
     def collection_record(self):
+        self.flush()
         return self.collection.api_response()
 
     @use_counter
@@ -575,6 +583,7 @@ class CollectionDirectory(CollectionDirectoryBase):
                 self.collection_record_file = FuncToJSONFile(
                     self.inode, self.collection_record)
                 self.inodes.add_entry(self.collection_record_file)
+            self.invalidate()  # use lookup as a signal to force update
             return self.collection_record_file
         else:
             return super(CollectionDirectory, self).__getitem__(item)
@@ -637,10 +646,33 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
         # This is always enable_write=True because it never tries to
         # save to the backend
         super(TmpCollectionDirectory, self).__init__(
-            parent_inode, inodes, api_client.config, True, collection)
-        self.collection_record_file = None
+            parent_inode, inodes, api_client.config, True, collection, self)
         self.populate(self.mtime())
 
+    def on_event(self, *args, **kwargs):
+        super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+        if self.collection_record_file is None:
+            return
+
+        # See discussion in CollectionDirectoryBase.on_event
+        lockcount = 0
+        try:
+            while True:
+                self.collection.lock.release()
+                lockcount += 1
+        except RuntimeError:
+            pass
+
+        try:
+            with llfuse.lock:
+                with self.collection.lock:
+                    self.collection_record_file.invalidate()
+                    self.inodes.invalidate_inode(self.collection_record_file)
+                    _logger.debug("%s invalidated collection record", self)
+        finally:
+            while lockcount > 0:
+                self.collection.lock.acquire()
+                lockcount -= 1
 
     def collection_record(self):
         with llfuse.lock_released:
@@ -671,6 +703,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return True
 
+    def flush(self):
+        pass
+
     def want_event_subscribe(self):
         return False