Merge branch '12430-output-glob'
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index e3b8dd4c2cca29616626dab55f6d440c22b58f51..9c78805107358dadf8b2f87221154753399b2c63 100644 (file)
@@ -36,7 +36,9 @@ class Directory(FreshBase):
     and the value referencing a File or Directory object.
     """
 
-    def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
+    __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
+
+    def __init__(self, parent_inode, inodes, enable_write, filters):
         """parent_inode is the integer inode number"""
 
         super(Directory, self).__init__()
@@ -46,7 +48,6 @@ class Directory(FreshBase):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
         self.inodes = inodes
-        self.apiconfig = apiconfig
         self._entries = {}
         self._mtime = time.time()
         self._enable_write = enable_write
@@ -64,23 +65,9 @@ class Directory(FreshBase):
             else:
                 yield [f_name, *f[1:]]
 
-    def forward_slash_subst(self):
-        if not hasattr(self, '_fsns'):
-            self._fsns = None
-            config = self.apiconfig()
-            try:
-                self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
-            except KeyError:
-                # old API server with no FSNS config
-                self._fsns = '_'
-            else:
-                if self._fsns == '' or self._fsns == '/':
-                    self._fsns = None
-        return self._fsns
-
     def unsanitize_filename(self, incoming):
         """Replace ForwardSlashNameSubstitution value with /"""
-        fsns = self.forward_slash_subst()
+        fsns = self.inodes.forward_slash_subst()
         if isinstance(fsns, str):
             return incoming.replace(fsns, '/')
         else:
@@ -99,7 +86,7 @@ class Directory(FreshBase):
         elif dirty == '..':
             return '__'
         else:
-            fsns = self.forward_slash_subst()
+            fsns = self.inodes.forward_slash_subst()
             if isinstance(fsns, str):
                 dirty = dirty.replace('/', fsns)
             return _disallowed_filename_characters.sub('_', dirty)
@@ -150,6 +137,10 @@ class Directory(FreshBase):
         self.inodes.touch(self)
         super(Directory, self).fresh()
 
+    def objsize(self):
+        # Rough estimate of memory footprint based on using pympler
+        return len(self._entries) * 1024
+
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
 
@@ -157,16 +148,17 @@ class Directory(FreshBase):
         entries that are the same in both the old and new lists, create new
         entries, and delete old entries missing from the new list.
 
-        :items: iterable with new directory contents
+        Arguments:
+        * items: Iterable --- New directory contents
 
-        :fn: function to take an entry in 'items' and return the desired file or
+        * fn: Callable --- Takes an entry in 'items' and return the desired file or
         directory name, or None if this entry should be skipped
 
-        :same: function to compare an existing entry (a File or Directory
+        * same: Callable --- Compare an existing entry (a File or Directory
         object) with an entry in the items list to determine whether to keep
         the existing entry.
 
-        :new_entry: function to create a new directory entry (File or Directory
+        * new_entry: Callable --- Create a new directory entry (File or Directory
         object) from an entry in the items list.
 
         """
@@ -176,29 +168,43 @@ class Directory(FreshBase):
         changed = False
         for i in items:
             name = self.sanitize_filename(fn(i))
-            if name:
-                if name in oldentries and same(oldentries[name], i):
+            if not name:
+                continue
+            if name in oldentries:
+                ent = oldentries[name]
+                if same(ent, i) and ent.parent_inode == self.inode:
                     # move existing directory entry over
-                    self._entries[name] = oldentries[name]
+                    self._entries[name] = ent
                     del oldentries[name]
-                else:
-                    _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
-                    # create new directory entry
-                    ent = new_entry(i)
-                    if ent is not None:
-                        self._entries[name] = self.inodes.add_entry(ent)
-                        changed = True
+                    self.inodes.inode_cache.touch(ent)
+
+        for i in items:
+            name = self.sanitize_filename(fn(i))
+            if not name:
+                continue
+            if name not in self._entries:
+                # create new directory entry
+                ent = new_entry(i)
+                if ent is not None:
+                    self._entries[name] = self.inodes.add_entry(ent)
+                    # need to invalidate this just in case there was a
+                    # previous entry that couldn't be moved over or a
+                    # lookup that returned file not found and cached
+                    # a negative result
+                    self.inodes.invalidate_entry(self, name)
+                    changed = True
+                _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
 
         # delete any other directory entries that were not in found in 'items'
-        for i in oldentries:
-            _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
-            self.inodes.invalidate_entry(self, i)
-            self.inodes.del_entry(oldentries[i])
+        for name, ent in oldentries.items():
+            _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
+            self.inodes.invalidate_entry(self, name)
+            self.inodes.del_entry(ent)
             changed = True
 
         if changed:
-            self.inodes.invalidate_inode(self)
             self._mtime = time.time()
+            self.inodes.inode_cache.update_cache_size(self)
 
         self.fresh()
 
@@ -210,27 +216,27 @@ class Directory(FreshBase):
                 return True
         return False
 
-    def has_ref(self, only_children):
-        if super(Directory, self).has_ref(only_children):
-            return True
-        for v in self._entries.values():
-            if v.has_ref(False):
-                return True
-        return False
-
     def clear(self):
         """Delete all entries"""
+        if not self._entries:
+            return
         oldentries = self._entries
         self._entries = {}
-        for n in oldentries:
-            oldentries[n].clear()
-            self.inodes.del_entry(oldentries[n])
         self.invalidate()
+        for name, ent in oldentries.items():
+            ent.clear()
+            self.inodes.invalidate_entry(self, name)
+            self.inodes.del_entry(ent)
+        self.inodes.inode_cache.update_cache_size(self)
 
     def kernel_invalidate(self):
         # Invalidating the dentry on the parent implies invalidating all paths
         # below it as well.
-        parent = self.inodes[self.parent_inode]
+        if self.parent_inode in self.inodes:
+            parent = self.inodes[self.parent_inode]
+        else:
+            # parent was removed already.
+            return
 
         # Find self on the parent in order to invalidate this path.
         # Calling the public items() method might trigger a refresh,
@@ -283,9 +289,10 @@ class CollectionDirectoryBase(Directory):
 
     """
 
-    def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
-        super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
-        self.apiconfig = apiconfig
+    __slots__ = ("collection", "collection_root", "collection_record_file")
+
+    def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
+        super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
         self.collection = collection
         self.collection_root = collection_root
         self.collection_record_file = None
@@ -293,17 +300,16 @@ class CollectionDirectoryBase(Directory):
     def new_entry(self, name, item, mtime):
         name = self.sanitize_filename(name)
         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
-            if item.fuse_entry.dead is not True:
-                raise Exception("Can only reparent dead inode entry")
+            if item.fuse_entry.parent_inode is not None:
+                raise Exception("Can only reparent unparented inode entry")
             if item.fuse_entry.inode is None:
                 raise Exception("Reparented entry must still have valid inode")
-            item.fuse_entry.dead = False
+            item.fuse_entry.parent_inode = self.inode
             self._entries[name] = item.fuse_entry
         elif isinstance(item, arvados.collection.RichCollectionBase):
             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
                 self.inode,
                 self.inodes,
-                self.apiconfig,
                 self._enable_write,
                 self._filters,
                 item,
@@ -449,14 +455,23 @@ class CollectionDirectoryBase(Directory):
 
     def clear(self):
         super(CollectionDirectoryBase, self).clear()
+        if self.collection is not None:
+            self.collection.unsubscribe()
         self.collection = None
 
+    def objsize(self):
+        # objsize for the whole collection is represented at the root,
+        # don't double-count it
+        return 0
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
 
+    __slots__ = ("api", "num_retries", "collection_locator",
+                 "_manifest_size", "_writable", "_updating_lock")
+
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
-        super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
+        super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -514,7 +529,10 @@ class CollectionDirectory(CollectionDirectoryBase):
         if self.collection_record_file is not None:
             self.collection_record_file.invalidate()
             self.inodes.invalidate_inode(self.collection_record_file)
-            _logger.debug("%s invalidated collection record file", self)
+            _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
+                          self.collection_record_file.inode)
+        self.inodes.update_uuid(self)
+        self.inodes.inode_cache.update_cache_size(self)
         self.fresh()
 
     def uuid(self):
@@ -592,6 +610,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         return False
 
     @use_counter
+    @check_update
     def collection_record(self):
         self.flush()
         return self.collection.api_response()
@@ -625,22 +644,32 @@ class CollectionDirectory(CollectionDirectoryBase):
         return (self.collection_locator is not None)
 
     def objsize(self):
-        # This is an empirically-derived heuristic to estimate the memory used
-        # to store this collection's metadata.  Calculating the memory
-        # footprint directly would be more accurate, but also more complicated.
-        return self._manifest_size * 128
+        # This is a rough guess of the amount of overhead involved for
+        # a collection; the assumptions are that that each file
+        # averages 128 bytes in the manifest, but consume 1024 bytes
+        # of Python data structures, so 1024/128=8 means we estimate
+        # the RAM footprint at 8 times the size of bare manifest text.
+        return self._manifest_size * 8
 
     def finalize(self):
-        if self.collection is not None:
-            if self.writable():
+        if self.collection is None:
+            return
+
+        if self.writable():
+            try:
                 self.collection.save()
-            self.collection.stop_threads()
+            except Exception as e:
+                _logger.exception("Failed to save collection %s", self.collection_locator)
+        self.collection.stop_threads()
 
     def clear(self):
         if self.collection is not None:
             self.collection.stop_threads()
-        super(CollectionDirectory, self).clear()
         self._manifest_size = 0
+        super(CollectionDirectory, self).clear()
+        if self.collection_record_file is not None:
+            self.inodes.del_entry(self.collection_record_file)
+        self.collection_record_file = None
 
 
 class TmpCollectionDirectory(CollectionDirectoryBase):
@@ -667,7 +696,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
         # This is always enable_write=True because it never tries to
         # save to the backend
         super(TmpCollectionDirectory, self).__init__(
-            parent_inode, inodes, api_client.config, True, filters, collection, self)
+            parent_inode, inodes, True, filters, collection, self)
         self.populate(self.mtime())
 
     def on_event(self, *args, **kwargs):
@@ -689,7 +718,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
                 with self.collection.lock:
                     self.collection_record_file.invalidate()
                     self.inodes.invalidate_inode(self.collection_record_file)
-                    _logger.debug("%s invalidated collection record", self)
+                    _logger.debug("%s invalidated collection record", self.inode)
         finally:
             while lockcount > 0:
                 self.collection.lock.acquire()
@@ -764,7 +793,7 @@ and the directory will appear if it exists.
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
-        super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.pdh_only = pdh_only
@@ -863,7 +892,7 @@ class TagsDirectory(Directory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -943,7 +972,7 @@ class TagDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
                  poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
@@ -984,9 +1013,13 @@ class TagDirectory(Directory):
 class ProjectDirectory(Directory):
     """A special directory that contains the contents of a project."""
 
+    __slots__ = ("api", "num_retries", "project_object", "project_object_file",
+                 "project_uuid", "_updating_lock",
+                 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
+
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  project_object, poll=True, poll_time=3, storage_classes=None):
-        super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
@@ -998,6 +1031,19 @@ class ProjectDirectory(Directory):
         self._current_user = None
         self._full_listing = False
         self.storage_classes = storage_classes
+        self.recursively_contained = False
+
+        # Filter groups can contain themselves, which causes tools
+        # that walk the filesystem to get stuck in an infinite loop,
+        # so suppress returning a listing in that case.
+        if self.project_object.get("group_class") == "filter":
+            iter_parent_inode = parent_inode
+            while iter_parent_inode != llfuse.ROOT_INODE:
+                parent_dir = self.inodes[iter_parent_inode]
+                if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
+                    self.recursively_contained = True
+                    break
+                iter_parent_inode = parent_dir.parent_inode
 
     def want_event_subscribe(self):
         return True
@@ -1048,7 +1094,7 @@ class ProjectDirectory(Directory):
             self.project_object_file = ObjectFile(self.inode, self.project_object)
             self.inodes.add_entry(self.project_object_file)
 
-        if not self._full_listing:
+        if self.recursively_contained or not self._full_listing:
             return True
 
         def samefn(a, i):
@@ -1092,7 +1138,6 @@ class ProjectDirectory(Directory):
                         *self._filters_for('collections', qualified=True),
                     ],
                 ) if obj['current_version_uuid'] == obj['uuid'])
-
             # end with llfuse.lock_released, re-acquire lock
 
             self.merge(contents,
@@ -1175,6 +1220,12 @@ class ProjectDirectory(Directory):
     def persisted(self):
         return True
 
+    def clear(self):
+        super(ProjectDirectory, self).clear()
+        if self.project_object_file is not None:
+            self.inodes.del_entry(self.project_object_file)
+        self.project_object_file = None
+
     @use_counter
     @check_update
     def mkdir(self, name):
@@ -1294,7 +1345,7 @@ class SharedDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  exclude, poll=False, poll_time=60, storage_classes=None):
-        super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)