Merge branch '12430-output-glob'
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index e3b8dd4c2cca29616626dab55f6d440c22b58f51..9c78805107358dadf8b2f87221154753399b2c63 100644 (file)
@@ -36,7 +36,9 @@ class Directory(FreshBase):
     and the value referencing a File or Directory object.
     """
 
     and the value referencing a File or Directory object.
     """
 
-    def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
+    __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
+
+    def __init__(self, parent_inode, inodes, enable_write, filters):
         """parent_inode is the integer inode number"""
 
         super(Directory, self).__init__()
         """parent_inode is the integer inode number"""
 
         super(Directory, self).__init__()
@@ -46,7 +48,6 @@ class Directory(FreshBase):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
         self.inodes = inodes
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
         self.inodes = inodes
-        self.apiconfig = apiconfig
         self._entries = {}
         self._mtime = time.time()
         self._enable_write = enable_write
         self._entries = {}
         self._mtime = time.time()
         self._enable_write = enable_write
@@ -64,23 +65,9 @@ class Directory(FreshBase):
             else:
                 yield [f_name, *f[1:]]
 
             else:
                 yield [f_name, *f[1:]]
 
-    def forward_slash_subst(self):
-        if not hasattr(self, '_fsns'):
-            self._fsns = None
-            config = self.apiconfig()
-            try:
-                self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
-            except KeyError:
-                # old API server with no FSNS config
-                self._fsns = '_'
-            else:
-                if self._fsns == '' or self._fsns == '/':
-                    self._fsns = None
-        return self._fsns
-
     def unsanitize_filename(self, incoming):
         """Replace ForwardSlashNameSubstitution value with /"""
     def unsanitize_filename(self, incoming):
         """Replace ForwardSlashNameSubstitution value with /"""
-        fsns = self.forward_slash_subst()
+        fsns = self.inodes.forward_slash_subst()
         if isinstance(fsns, str):
             return incoming.replace(fsns, '/')
         else:
         if isinstance(fsns, str):
             return incoming.replace(fsns, '/')
         else:
@@ -99,7 +86,7 @@ class Directory(FreshBase):
         elif dirty == '..':
             return '__'
         else:
         elif dirty == '..':
             return '__'
         else:
-            fsns = self.forward_slash_subst()
+            fsns = self.inodes.forward_slash_subst()
             if isinstance(fsns, str):
                 dirty = dirty.replace('/', fsns)
             return _disallowed_filename_characters.sub('_', dirty)
             if isinstance(fsns, str):
                 dirty = dirty.replace('/', fsns)
             return _disallowed_filename_characters.sub('_', dirty)
@@ -150,6 +137,10 @@ class Directory(FreshBase):
         self.inodes.touch(self)
         super(Directory, self).fresh()
 
         self.inodes.touch(self)
         super(Directory, self).fresh()
 
+    def objsize(self):
+        # Rough estimate of memory footprint based on using pympler
+        return len(self._entries) * 1024
+
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
 
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
 
@@ -157,16 +148,17 @@ class Directory(FreshBase):
         entries that are the same in both the old and new lists, create new
         entries, and delete old entries missing from the new list.
 
         entries that are the same in both the old and new lists, create new
         entries, and delete old entries missing from the new list.
 
-        :items: iterable with new directory contents
+        Arguments:
+        * items: Iterable --- New directory contents
 
 
-        :fn: function to take an entry in 'items' and return the desired file or
+        * fn: Callable --- Takes an entry in 'items' and return the desired file or
         directory name, or None if this entry should be skipped
 
         directory name, or None if this entry should be skipped
 
-        :same: function to compare an existing entry (a File or Directory
+        * same: Callable --- Compare an existing entry (a File or Directory
         object) with an entry in the items list to determine whether to keep
         the existing entry.
 
         object) with an entry in the items list to determine whether to keep
         the existing entry.
 
-        :new_entry: function to create a new directory entry (File or Directory
+        * new_entry: Callable --- Create a new directory entry (File or Directory
         object) from an entry in the items list.
 
         """
         object) from an entry in the items list.
 
         """
@@ -176,29 +168,43 @@ class Directory(FreshBase):
         changed = False
         for i in items:
             name = self.sanitize_filename(fn(i))
         changed = False
         for i in items:
             name = self.sanitize_filename(fn(i))
-            if name:
-                if name in oldentries and same(oldentries[name], i):
+            if not name:
+                continue
+            if name in oldentries:
+                ent = oldentries[name]
+                if same(ent, i) and ent.parent_inode == self.inode:
                     # move existing directory entry over
                     # move existing directory entry over
-                    self._entries[name] = oldentries[name]
+                    self._entries[name] = ent
                     del oldentries[name]
                     del oldentries[name]
-                else:
-                    _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
-                    # create new directory entry
-                    ent = new_entry(i)
-                    if ent is not None:
-                        self._entries[name] = self.inodes.add_entry(ent)
-                        changed = True
+                    self.inodes.inode_cache.touch(ent)
+
+        for i in items:
+            name = self.sanitize_filename(fn(i))
+            if not name:
+                continue
+            if name not in self._entries:
+                # create new directory entry
+                ent = new_entry(i)
+                if ent is not None:
+                    self._entries[name] = self.inodes.add_entry(ent)
+                    # need to invalidate this just in case there was a
+                    # previous entry that couldn't be moved over or a
+                    # lookup that returned file not found and cached
+                    # a negative result
+                    self.inodes.invalidate_entry(self, name)
+                    changed = True
+                _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
 
         # delete any other directory entries that were not in found in 'items'
 
         # delete any other directory entries that were not in found in 'items'
-        for i in oldentries:
-            _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
-            self.inodes.invalidate_entry(self, i)
-            self.inodes.del_entry(oldentries[i])
+        for name, ent in oldentries.items():
+            _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
+            self.inodes.invalidate_entry(self, name)
+            self.inodes.del_entry(ent)
             changed = True
 
         if changed:
             changed = True
 
         if changed:
-            self.inodes.invalidate_inode(self)
             self._mtime = time.time()
             self._mtime = time.time()
+            self.inodes.inode_cache.update_cache_size(self)
 
         self.fresh()
 
 
         self.fresh()
 
@@ -210,27 +216,27 @@ class Directory(FreshBase):
                 return True
         return False
 
                 return True
         return False
 
-    def has_ref(self, only_children):
-        if super(Directory, self).has_ref(only_children):
-            return True
-        for v in self._entries.values():
-            if v.has_ref(False):
-                return True
-        return False
-
     def clear(self):
         """Delete all entries"""
     def clear(self):
         """Delete all entries"""
+        if not self._entries:
+            return
         oldentries = self._entries
         self._entries = {}
         oldentries = self._entries
         self._entries = {}
-        for n in oldentries:
-            oldentries[n].clear()
-            self.inodes.del_entry(oldentries[n])
         self.invalidate()
         self.invalidate()
+        for name, ent in oldentries.items():
+            ent.clear()
+            self.inodes.invalidate_entry(self, name)
+            self.inodes.del_entry(ent)
+        self.inodes.inode_cache.update_cache_size(self)
 
     def kernel_invalidate(self):
         # Invalidating the dentry on the parent implies invalidating all paths
         # below it as well.
 
     def kernel_invalidate(self):
         # Invalidating the dentry on the parent implies invalidating all paths
         # below it as well.
-        parent = self.inodes[self.parent_inode]
+        if self.parent_inode in self.inodes:
+            parent = self.inodes[self.parent_inode]
+        else:
+            # parent was removed already.
+            return
 
         # Find self on the parent in order to invalidate this path.
         # Calling the public items() method might trigger a refresh,
 
         # Find self on the parent in order to invalidate this path.
         # Calling the public items() method might trigger a refresh,
@@ -283,9 +289,10 @@ class CollectionDirectoryBase(Directory):
 
     """
 
 
     """
 
-    def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
-        super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
-        self.apiconfig = apiconfig
+    __slots__ = ("collection", "collection_root", "collection_record_file")
+
+    def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
+        super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
         self.collection = collection
         self.collection_root = collection_root
         self.collection_record_file = None
         self.collection = collection
         self.collection_root = collection_root
         self.collection_record_file = None
@@ -293,17 +300,16 @@ class CollectionDirectoryBase(Directory):
     def new_entry(self, name, item, mtime):
         name = self.sanitize_filename(name)
         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
     def new_entry(self, name, item, mtime):
         name = self.sanitize_filename(name)
         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
-            if item.fuse_entry.dead is not True:
-                raise Exception("Can only reparent dead inode entry")
+            if item.fuse_entry.parent_inode is not None:
+                raise Exception("Can only reparent unparented inode entry")
             if item.fuse_entry.inode is None:
                 raise Exception("Reparented entry must still have valid inode")
             if item.fuse_entry.inode is None:
                 raise Exception("Reparented entry must still have valid inode")
-            item.fuse_entry.dead = False
+            item.fuse_entry.parent_inode = self.inode
             self._entries[name] = item.fuse_entry
         elif isinstance(item, arvados.collection.RichCollectionBase):
             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
                 self.inode,
                 self.inodes,
             self._entries[name] = item.fuse_entry
         elif isinstance(item, arvados.collection.RichCollectionBase):
             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
                 self.inode,
                 self.inodes,
-                self.apiconfig,
                 self._enable_write,
                 self._filters,
                 item,
                 self._enable_write,
                 self._filters,
                 item,
@@ -449,14 +455,23 @@ class CollectionDirectoryBase(Directory):
 
     def clear(self):
         super(CollectionDirectoryBase, self).clear()
 
     def clear(self):
         super(CollectionDirectoryBase, self).clear()
+        if self.collection is not None:
+            self.collection.unsubscribe()
         self.collection = None
 
         self.collection = None
 
+    def objsize(self):
+        # objsize for the whole collection is represented at the root,
+        # don't double-count it
+        return 0
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
 
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
 
+    __slots__ = ("api", "num_retries", "collection_locator",
+                 "_manifest_size", "_writable", "_updating_lock")
+
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
-        super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
+        super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -514,7 +529,10 @@ class CollectionDirectory(CollectionDirectoryBase):
         if self.collection_record_file is not None:
             self.collection_record_file.invalidate()
             self.inodes.invalidate_inode(self.collection_record_file)
         if self.collection_record_file is not None:
             self.collection_record_file.invalidate()
             self.inodes.invalidate_inode(self.collection_record_file)
-            _logger.debug("%s invalidated collection record file", self)
+            _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
+                          self.collection_record_file.inode)
+        self.inodes.update_uuid(self)
+        self.inodes.inode_cache.update_cache_size(self)
         self.fresh()
 
     def uuid(self):
         self.fresh()
 
     def uuid(self):
@@ -592,6 +610,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         return False
 
     @use_counter
         return False
 
     @use_counter
+    @check_update
     def collection_record(self):
         self.flush()
         return self.collection.api_response()
     def collection_record(self):
         self.flush()
         return self.collection.api_response()
@@ -625,22 +644,32 @@ class CollectionDirectory(CollectionDirectoryBase):
         return (self.collection_locator is not None)
 
     def objsize(self):
         return (self.collection_locator is not None)
 
     def objsize(self):
-        # This is an empirically-derived heuristic to estimate the memory used
-        # to store this collection's metadata.  Calculating the memory
-        # footprint directly would be more accurate, but also more complicated.
-        return self._manifest_size * 128
+        # This is a rough guess of the amount of overhead involved for
+        # a collection; the assumptions are that that each file
+        # averages 128 bytes in the manifest, but consume 1024 bytes
+        # of Python data structures, so 1024/128=8 means we estimate
+        # the RAM footprint at 8 times the size of bare manifest text.
+        return self._manifest_size * 8
 
     def finalize(self):
 
     def finalize(self):
-        if self.collection is not None:
-            if self.writable():
+        if self.collection is None:
+            return
+
+        if self.writable():
+            try:
                 self.collection.save()
                 self.collection.save()
-            self.collection.stop_threads()
+            except Exception as e:
+                _logger.exception("Failed to save collection %s", self.collection_locator)
+        self.collection.stop_threads()
 
     def clear(self):
         if self.collection is not None:
             self.collection.stop_threads()
 
     def clear(self):
         if self.collection is not None:
             self.collection.stop_threads()
-        super(CollectionDirectory, self).clear()
         self._manifest_size = 0
         self._manifest_size = 0
+        super(CollectionDirectory, self).clear()
+        if self.collection_record_file is not None:
+            self.inodes.del_entry(self.collection_record_file)
+        self.collection_record_file = None
 
 
 class TmpCollectionDirectory(CollectionDirectoryBase):
 
 
 class TmpCollectionDirectory(CollectionDirectoryBase):
@@ -667,7 +696,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
         # This is always enable_write=True because it never tries to
         # save to the backend
         super(TmpCollectionDirectory, self).__init__(
         # This is always enable_write=True because it never tries to
         # save to the backend
         super(TmpCollectionDirectory, self).__init__(
-            parent_inode, inodes, api_client.config, True, filters, collection, self)
+            parent_inode, inodes, True, filters, collection, self)
         self.populate(self.mtime())
 
     def on_event(self, *args, **kwargs):
         self.populate(self.mtime())
 
     def on_event(self, *args, **kwargs):
@@ -689,7 +718,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
                 with self.collection.lock:
                     self.collection_record_file.invalidate()
                     self.inodes.invalidate_inode(self.collection_record_file)
                 with self.collection.lock:
                     self.collection_record_file.invalidate()
                     self.inodes.invalidate_inode(self.collection_record_file)
-                    _logger.debug("%s invalidated collection record", self)
+                    _logger.debug("%s invalidated collection record", self.inode)
         finally:
             while lockcount > 0:
                 self.collection.lock.acquire()
         finally:
             while lockcount > 0:
                 self.collection.lock.acquire()
@@ -764,7 +793,7 @@ and the directory will appear if it exists.
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
-        super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.pdh_only = pdh_only
         self.api = api
         self.num_retries = num_retries
         self.pdh_only = pdh_only
@@ -863,7 +892,7 @@ class TagsDirectory(Directory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -943,7 +972,7 @@ class TagDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
                  poll=False, poll_time=60):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
                  poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
@@ -984,9 +1013,13 @@ class TagDirectory(Directory):
 class ProjectDirectory(Directory):
     """A special directory that contains the contents of a project."""
 
 class ProjectDirectory(Directory):
     """A special directory that contains the contents of a project."""
 
+    __slots__ = ("api", "num_retries", "project_object", "project_object_file",
+                 "project_uuid", "_updating_lock",
+                 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
+
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  project_object, poll=True, poll_time=3, storage_classes=None):
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  project_object, poll=True, poll_time=3, storage_classes=None):
-        super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
@@ -998,6 +1031,19 @@ class ProjectDirectory(Directory):
         self._current_user = None
         self._full_listing = False
         self.storage_classes = storage_classes
         self._current_user = None
         self._full_listing = False
         self.storage_classes = storage_classes
+        self.recursively_contained = False
+
+        # Filter groups can contain themselves, which causes tools
+        # that walk the filesystem to get stuck in an infinite loop,
+        # so suppress returning a listing in that case.
+        if self.project_object.get("group_class") == "filter":
+            iter_parent_inode = parent_inode
+            while iter_parent_inode != llfuse.ROOT_INODE:
+                parent_dir = self.inodes[iter_parent_inode]
+                if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
+                    self.recursively_contained = True
+                    break
+                iter_parent_inode = parent_dir.parent_inode
 
     def want_event_subscribe(self):
         return True
 
     def want_event_subscribe(self):
         return True
@@ -1048,7 +1094,7 @@ class ProjectDirectory(Directory):
             self.project_object_file = ObjectFile(self.inode, self.project_object)
             self.inodes.add_entry(self.project_object_file)
 
             self.project_object_file = ObjectFile(self.inode, self.project_object)
             self.inodes.add_entry(self.project_object_file)
 
-        if not self._full_listing:
+        if self.recursively_contained or not self._full_listing:
             return True
 
         def samefn(a, i):
             return True
 
         def samefn(a, i):
@@ -1092,7 +1138,6 @@ class ProjectDirectory(Directory):
                         *self._filters_for('collections', qualified=True),
                     ],
                 ) if obj['current_version_uuid'] == obj['uuid'])
                         *self._filters_for('collections', qualified=True),
                     ],
                 ) if obj['current_version_uuid'] == obj['uuid'])
-
             # end with llfuse.lock_released, re-acquire lock
 
             self.merge(contents,
             # end with llfuse.lock_released, re-acquire lock
 
             self.merge(contents,
@@ -1175,6 +1220,12 @@ class ProjectDirectory(Directory):
     def persisted(self):
         return True
 
     def persisted(self):
         return True
 
+    def clear(self):
+        super(ProjectDirectory, self).clear()
+        if self.project_object_file is not None:
+            self.inodes.del_entry(self.project_object_file)
+        self.project_object_file = None
+
     @use_counter
     @check_update
     def mkdir(self, name):
     @use_counter
     @check_update
     def mkdir(self, name):
@@ -1294,7 +1345,7 @@ class SharedDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  exclude, poll=False, poll_time=60, storage_classes=None):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  exclude, poll=False, poll_time=60, storage_classes=None):
-        super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)