Merge branch '10584-fuse-stop-threads' refs #10584
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index aa9c4fd32ed00ee854a9eedab3acb8fbc9a59645..30ae6b40e0ae95c751ccaa1b3c0760b623d793c5 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 import logging
 import re
 import time
@@ -156,13 +160,21 @@ class Directory(FreshBase):
 
         self.fresh()
 
-    def can_clear(self):
-        if not super(Directory, self).can_clear():
-            return False
+    def in_use(self):
+        if super(Directory, self).in_use():
+            return True
         for v in self._entries.itervalues():
-            if not v.can_clear():
-                return False
-        return True
+            if v.in_use():
+                return True
+        return False
+
+    def has_ref(self, only_children):
+        if super(Directory, self).has_ref(only_children):
+            return True
+        for v in self._entries.itervalues():
+            if v.has_ref(False):
+                return True
+        return False
 
     def clear(self):
         """Delete all entries"""
@@ -170,12 +182,17 @@ class Directory(FreshBase):
         self._entries = {}
         for n in oldentries:
             oldentries[n].clear()
-        for n in oldentries:
             self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
             self.inodes.del_entry(oldentries[n])
         self.inodes.invalidate_inode(self.inode)
         self.invalidate()
 
+    def kernel_invalidate(self):
+        for n, e in self._entries.iteritems():
+            self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+            e.kernel_invalidate()
+        self.inodes.invalidate_inode(self.inode)
+
     def mtime(self):
         return self._mtime
 
@@ -322,9 +339,8 @@ class CollectionDirectoryBase(Directory):
         src.flush()
 
     def clear(self):
-        r = super(CollectionDirectoryBase, self).clear()
+        super(CollectionDirectoryBase, self).clear()
         self.collection = None
-        return r
 
 
 class CollectionDirectory(CollectionDirectoryBase):
@@ -494,6 +510,12 @@ class CollectionDirectory(CollectionDirectoryBase):
                 self.collection.save()
             self.collection.stop_threads()
 
+    def clear(self):
+        if self.collection is not None:
+            self.collection.stop_threads()
+        super(CollectionDirectory, self).clear()
+        self._manifest_size = 0
+
 
 class TmpCollectionDirectory(CollectionDirectoryBase):
     """A directory backed by an Arvados collection that never gets saved.
@@ -648,17 +670,7 @@ will appear if it exists.
         return not self.pdh_only
 
 
-class RecursiveInvalidateDirectory(Directory):
-    def invalidate(self):
-        try:
-            super(RecursiveInvalidateDirectory, self).invalidate()
-            for a in self._entries:
-                self._entries[a].invalidate()
-        except Exception:
-            _logger.exception()
-
-
-class TagsDirectory(RecursiveInvalidateDirectory):
+class TagsDirectory(Directory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
@@ -667,6 +679,7 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self.num_retries = num_retries
         self._poll = True
         self._poll_time = poll_time
+        self._extra = set()
 
     def want_event_subscribe(self):
         return True
@@ -675,15 +688,41 @@ class TagsDirectory(RecursiveInvalidateDirectory):
     def update(self):
         with llfuse.lock_released:
             tags = self.api.links().list(
-                filters=[['link_class', '=', 'tag']],
-                select=['name'], distinct=True
+                filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
+                select=['name'], distinct=True, limit=1000
                 ).execute(num_retries=self.num_retries)
         if "items" in tags:
-            self.merge(tags['items'],
+            self.merge(tags['items']+[{"name": n} for n in self._extra],
                        lambda i: i['name'],
                        lambda a, i: a.tag == i['name'],
                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
 
+    @use_counter
+    @check_update
+    def __getitem__(self, item):
+        if super(TagsDirectory, self).__contains__(item):
+            return super(TagsDirectory, self).__getitem__(item)
+        with llfuse.lock_released:
+            tags = self.api.links().list(
+                filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
+            ).execute(num_retries=self.num_retries)
+        if tags["items"]:
+            self._extra.add(item)
+            self.update()
+        return super(TagsDirectory, self).__getitem__(item)
+
+    @use_counter
+    @check_update
+    def __contains__(self, k):
+        if super(TagsDirectory, self).__contains__(k):
+            return True
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
+        return False
+
 
 class TagDirectory(Directory):
     """A special directory that contains as subdirectories all collections visible