Merge branch '10584-fuse-stop-threads' refs #10584
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 3f2bcd5ec2464fb5351a42be205b17f1fe93e49c..30ae6b40e0ae95c751ccaa1b3c0760b623d793c5 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 import logging
 import re
 import time
@@ -156,24 +160,38 @@ class Directory(FreshBase):
 
         self.fresh()
 
-    def clear(self, force=False):
-        """Delete all entries"""
+    def in_use(self):
+        if super(Directory, self).in_use():
+            return True
+        for v in self._entries.itervalues():
+            if v.in_use():
+                return True
+        return False
 
-        if not self.in_use() or force:
-            oldentries = self._entries
-            self._entries = {}
-            for n in oldentries:
-                if not oldentries[n].clear(force):
-                    self._entries = oldentries
-                    return False
-            for n in oldentries:
-                self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
-                self.inodes.del_entry(oldentries[n])
-            self.inodes.invalidate_inode(self.inode)
-            self.invalidate()
+    def has_ref(self, only_children):
+        if super(Directory, self).has_ref(only_children):
             return True
-        else:
-            return False
+        for v in self._entries.itervalues():
+            if v.has_ref(False):
+                return True
+        return False
+
+    def clear(self):
+        """Delete all entries"""
+        oldentries = self._entries
+        self._entries = {}
+        for n in oldentries:
+            oldentries[n].clear()
+            self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+            self.inodes.del_entry(oldentries[n])
+        self.inodes.invalidate_inode(self.inode)
+        self.invalidate()
+
+    def kernel_invalidate(self):
+        for n, e in self._entries.iteritems():
+            self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+            e.kernel_invalidate()
+        self.inodes.invalidate_inode(self.inode)
 
     def mtime(self):
         return self._mtime
@@ -320,10 +338,9 @@ class CollectionDirectoryBase(Directory):
         self.flush()
         src.flush()
 
-    def clear(self, force=False):
-        r = super(CollectionDirectoryBase, self).clear(force)
+    def clear(self):
+        super(CollectionDirectoryBase, self).clear()
         self.collection = None
-        return r
 
 
 class CollectionDirectory(CollectionDirectoryBase):
@@ -375,7 +392,7 @@ class CollectionDirectory(CollectionDirectoryBase):
 
     def new_collection(self, new_collection_record, coll_reader):
         if self.inode:
-            self.clear(force=True)
+            self.clear()
 
         self.collection_record = new_collection_record
 
@@ -407,7 +424,7 @@ class CollectionDirectory(CollectionDirectoryBase):
                     if not self.stale():
                         return
 
-                    _logger.debug("Updating %s", to_record_version)
+                    _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
                     if self.collection is not None:
                         if self.collection.known_past_version(to_record_version):
                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
@@ -493,6 +510,12 @@ class CollectionDirectory(CollectionDirectoryBase):
                 self.collection.save()
             self.collection.stop_threads()
 
+    def clear(self):
+        if self.collection is not None:
+            self.collection.stop_threads()
+        super(CollectionDirectory, self).clear()
+        self._manifest_size = 0
+
 
 class TmpCollectionDirectory(CollectionDirectoryBase):
     """A directory backed by an Arvados collection that never gets saved.
@@ -640,24 +663,14 @@ will appear if it exists.
         else:
             raise KeyError("No collection with id " + item)
 
-    def clear(self, force=False):
+    def clear(self):
         pass
 
     def want_event_subscribe(self):
         return not self.pdh_only
 
 
-class RecursiveInvalidateDirectory(Directory):
-    def invalidate(self):
-        try:
-            super(RecursiveInvalidateDirectory, self).invalidate()
-            for a in self._entries:
-                self._entries[a].invalidate()
-        except Exception:
-            _logger.exception()
-
-
-class TagsDirectory(RecursiveInvalidateDirectory):
+class TagsDirectory(Directory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
@@ -666,6 +679,7 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self.num_retries = num_retries
         self._poll = True
         self._poll_time = poll_time
+        self._extra = set()
 
     def want_event_subscribe(self):
         return True
@@ -674,15 +688,41 @@ class TagsDirectory(RecursiveInvalidateDirectory):
     def update(self):
         with llfuse.lock_released:
             tags = self.api.links().list(
-                filters=[['link_class', '=', 'tag']],
-                select=['name'], distinct=True
+                filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
+                select=['name'], distinct=True, limit=1000
                 ).execute(num_retries=self.num_retries)
         if "items" in tags:
-            self.merge(tags['items'],
+            self.merge(tags['items']+[{"name": n} for n in self._extra],
                        lambda i: i['name'],
                        lambda a, i: a.tag == i['name'],
                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
 
+    @use_counter
+    @check_update
+    def __getitem__(self, item):
+        if super(TagsDirectory, self).__contains__(item):
+            return super(TagsDirectory, self).__getitem__(item)
+        with llfuse.lock_released:
+            tags = self.api.links().list(
+                filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
+            ).execute(num_retries=self.num_retries)
+        if tags["items"]:
+            self._extra.add(item)
+            self.update()
+        return super(TagsDirectory, self).__getitem__(item)
+
+    @use_counter
+    @check_update
+    def __contains__(self, k):
+        if super(TagsDirectory, self).__contains__(k):
+            return True
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
+        return False
+
 
 class TagDirectory(Directory):
     """A special directory that contains as subdirectories all collections visible