18941: Need to leave some space for current block
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index a2e33c7b3bcc47b7d8288dc60168f75ec151b647..8dcde59ec336e2ba4cb872f7a7fbdc2faa755807 100644 (file)
@@ -270,10 +270,12 @@ class CollectionDirectoryBase(Directory):
 
     """
 
-    def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection):
+    def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
         self.apiconfig = apiconfig
         self.collection = collection
+        self.collection_root = collection_root
+        self.collection_record_file = None
 
     def new_entry(self, name, item, mtime):
         name = self.sanitize_filename(name)
@@ -285,13 +287,17 @@ class CollectionDirectoryBase(Directory):
             item.fuse_entry.dead = False
             self._entries[name] = item.fuse_entry
         elif isinstance(item, arvados.collection.RichCollectionBase):
-            self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item))
+            self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
             self._entries[name].populate(mtime)
         else:
             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
         item.fuse_entry = self._entries[name]
 
     def on_event(self, event, collection, name, item):
+        # These are events from the Collection object (ADD/DEL/MOD)
+        # emitted by operations on the Collection object (like
+        # "mkdirs" or "remove"), and by "update", which we need to
+        # synchronize with our FUSE objects that are assigned inodes.
         if collection == self.collection:
             name = self.sanitize_filename(name)
 
@@ -336,6 +342,10 @@ class CollectionDirectoryBase(Directory):
                                 self.inodes.invalidate_inode(item.fuse_entry)
                             elif name in self._entries:
                                 self.inodes.invalidate_inode(self._entries[name])
+
+                        if self.collection_record_file is not None:
+                            self.collection_record_file.invalidate()
+                            self.inodes.invalidate_inode(self.collection_record_file)
             finally:
                 while lockcount > 0:
                     self.collection.lock.acquire()
@@ -353,10 +363,7 @@ class CollectionDirectoryBase(Directory):
 
     @use_counter
     def flush(self):
-        if not self.writable():
-            return
-        with llfuse.lock_released:
-            self.collection.root_collection().save()
+        self.collection_root.flush()
 
     @use_counter
     @check_update
@@ -428,11 +435,9 @@ class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
-        super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None)
+        super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
         self.api = api
         self.num_retries = num_retries
-        self.collection_record_file = None
-        self.collection_record = None
         self._poll = True
         try:
             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
@@ -457,70 +462,82 @@ class CollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
 
+    @use_counter
+    def flush(self):
+        if not self.writable():
+            return
+        with llfuse.lock_released:
+            with self._updating_lock:
+                if self.collection.committed():
+                    self.collection.update()
+                else:
+                    self.collection.save()
+                self.new_collection_record(self.collection.api_response())
+
     def want_event_subscribe(self):
         return (uuid_pattern.match(self.collection_locator) is not None)
 
-    # Used by arv-web.py to switch the contents of the CollectionDirectory
-    def change_collection(self, new_locator):
-        """Switch the contents of the CollectionDirectory.
-
-        Must be called with llfuse.lock held.
-        """
-
-        self.collection_locator = new_locator
-        self.collection_record = None
-        self.update()
-
     def new_collection(self, new_collection_record, coll_reader):
         if self.inode:
             self.clear()
-
-        self.collection_record = new_collection_record
-
-        if self.collection_record:
-            self._mtime = convertTime(self.collection_record.get('modified_at'))
-            self.collection_locator = self.collection_record["uuid"]
-            if self.collection_record_file is not None:
-                self.collection_record_file.update(self.collection_record)
-
         self.collection = coll_reader
+        self.new_collection_record(new_collection_record)
         self.populate(self.mtime())
 
+    def new_collection_record(self, new_collection_record):
+        if not new_collection_record:
+            raise Exception("invalid new_collection_record")
+        self._mtime = convertTime(new_collection_record.get('modified_at'))
+        self._manifest_size = len(new_collection_record["manifest_text"])
+        self.collection_locator = new_collection_record["uuid"]
+        if self.collection_record_file is not None:
+            self.collection_record_file.invalidate()
+            self.inodes.invalidate_inode(self.collection_record_file)
+            _logger.debug("%s invalidated collection record file", self)
+        self.fresh()
+
     def uuid(self):
         return self.collection_locator
 
     @use_counter
-    def update(self, to_record_version=None):
+    def update(self):
         try:
-            if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
+            if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
+                # It's immutable, nothing to update
                 return True
 
             if self.collection_locator is None:
+                # No collection locator to retrieve from
                 self.fresh()
                 return True
 
+            new_collection_record = None
             try:
                 with llfuse.lock_released:
                     self._updating_lock.acquire()
                     if not self.stale():
-                        return
+                        return True
 
-                    _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
-                    new_collection_record = None
+                    _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
+                    coll_reader = None
                     if self.collection is not None:
-                        if self.collection.known_past_version(to_record_version):
-                            _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
-                        else:
-                            self.collection.update()
+                        # Already have a collection object
+                        self.collection.update()
+                        new_collection_record = self.collection.api_response()
                     else:
+                        get_threads = max(self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 0)
+                        # Create a new collection object
                         if uuid_pattern.match(self.collection_locator):
                             coll_reader = arvados.collection.Collection(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries)
+                                num_retries=self.num_retries,
+                                get_threads=get_threads)
                         else:
                             coll_reader = arvados.collection.CollectionReader(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries)
+                                num_retries=self.num_retries,
+                                get_threads=get_threads)
+                            )
                         new_collection_record = coll_reader.api_response() or {}
                         # If the Collection only exists in Keep, there will be no API
                         # response.  Fill in the fields we need.
@@ -534,14 +551,13 @@ class CollectionDirectory(CollectionDirectoryBase):
                             new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
 
                 # end with llfuse.lock_released, re-acquire lock
-                if (new_collection_record is not None and
-                    (self.collection_record is None or
-                     self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
-                    self.new_collection(new_collection_record, coll_reader)
-                    self._manifest_size = len(coll_reader.manifest_text())
-                    _logger.debug("%s manifest_size %i", self, self._manifest_size)
 
-                self.fresh()
+                if new_collection_record is not None:
+                    if coll_reader is not None:
+                        self.new_collection(new_collection_record, coll_reader)
+                    else:
+                        self.new_collection_record(new_collection_record)
+
                 return True
             finally:
                 self._updating_lock.release()
@@ -549,22 +565,29 @@ class CollectionDirectory(CollectionDirectoryBase):
             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
         except arvados.errors.ArgumentError as detail:
             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
-            if self.collection_record is not None and "manifest_text" in self.collection_record:
-                _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+            if new_collection_record is not None and "manifest_text" in new_collection_record:
+                _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
         except Exception:
             _logger.exception("arv-mount %s: error", self.collection_locator)
-            if self.collection_record is not None and "manifest_text" in self.collection_record:
-                _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+            if new_collection_record is not None and "manifest_text" in new_collection_record:
+                _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
         self.invalidate()
         return False
 
+    @use_counter
+    def collection_record(self):
+        self.flush()
+        return self.collection.api_response()
+
     @use_counter
     @check_update
     def __getitem__(self, item):
         if item == '.arvados#collection':
             if self.collection_record_file is None:
-                self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+                self.collection_record_file = FuncToJSONFile(
+                    self.inode, self.collection_record)
                 self.inodes.add_entry(self.collection_record_file)
+            self.invalidate()  # use lookup as a signal to force update
             return self.collection_record_file
         else:
             return super(CollectionDirectory, self).__getitem__(item)
@@ -576,8 +599,9 @@ class CollectionDirectory(CollectionDirectoryBase):
             return super(CollectionDirectory, self).__contains__(k)
 
     def invalidate(self):
-        self.collection_record = None
-        self.collection_record_file = None
+        if self.collection_record_file is not None:
+            self.collection_record_file.invalidate()
+            self.inodes.invalidate_inode(self.collection_record_file)
         super(CollectionDirectory, self).invalidate()
 
     def persisted(self):
@@ -626,33 +650,33 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
         # This is always enable_write=True because it never tries to
         # save to the backend
         super(TmpCollectionDirectory, self).__init__(
-            parent_inode, inodes, api_client.config, True, collection)
-        self.collection_record_file = None
+            parent_inode, inodes, api_client.config, True, collection, self)
         self.populate(self.mtime())
 
     def on_event(self, *args, **kwargs):
         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
-        if self.collection_record_file:
+        if self.collection_record_file is None:
+            return
 
-            # See discussion in CollectionDirectoryBase.on_event
-            lockcount = 0
-            try:
-                while True:
-                    self.collection.lock.release()
-                    lockcount += 1
-            except RuntimeError:
-                pass
+        # See discussion in CollectionDirectoryBase.on_event
+        lockcount = 0
+        try:
+            while True:
+                self.collection.lock.release()
+                lockcount += 1
+        except RuntimeError:
+            pass
 
-            try:
-                with llfuse.lock:
-                    with self.collection.lock:
-                        self.collection_record_file.invalidate()
-                        self.inodes.invalidate_inode(self.collection_record_file)
-                        _logger.debug("%s invalidated collection record", self)
-            finally:
-                while lockcount > 0:
-                    self.collection.lock.acquire()
-                    lockcount -= 1
+        try:
+            with llfuse.lock:
+                with self.collection.lock:
+                    self.collection_record_file.invalidate()
+                    self.inodes.invalidate_inode(self.collection_record_file)
+                    _logger.debug("%s invalidated collection record", self)
+        finally:
+            while lockcount > 0:
+                self.collection.lock.acquire()
+                lockcount -= 1
 
     def collection_record(self):
         with llfuse.lock_released:
@@ -683,6 +707,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return True
 
+    def flush(self):
+        pass
+
     def want_event_subscribe(self):
         return False
 
@@ -973,11 +1000,13 @@ class ProjectDirectory(Directory):
                                                         uuid=self.project_uuid,
                                                         filters=[["uuid", "is_a", "arvados#group"],
                                                                  ["groups.group_class", "in", ["project","filter"]]]))
-                contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
+                contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
+                                       arvados.util.keyset_list_all(self.api.groups().contents,
                                                              order_key="uuid",
                                                              num_retries=self.num_retries,
                                                              uuid=self.project_uuid,
-                                                             filters=[["uuid", "is_a", "arvados#collection"]]))
+                                                             filters=[["uuid", "is_a", "arvados#collection"]])))
+
 
             # end with llfuse.lock_released, re-acquire lock