21020: Remove configuration path from Python READMEs
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index aeb0ce4ba7426103e3746d27e05a21cc34912fe4..c39afa4757cb0ba9a5b415bf7bdb6f7a0e1d8318 100644 (file)
@@ -54,11 +54,6 @@ inode assigned to it and appears in the Inodes._entries dictionary.
 
 """
 
-from __future__ import absolute_import
-from __future__ import division
-from builtins import next
-from builtins import str
-from builtins import object
 import os
 import llfuse
 import errno
@@ -77,7 +72,6 @@ from prometheus_client import Summary
 import queue
 from dataclasses import dataclass
 import typing
-import gc
 
 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
 from .fusefile import File, StringFile, FuseArvadosFile
@@ -154,6 +148,9 @@ class InodeCache(object):
     """
 
     def __init__(self, cap, min_entries=4):
+        # Standard dictionaries are ordered, but OrderedDict is still better here, see
+        # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
+        # specifically we use move_to_end() which standard dicts don't have.
         self._cache_entries = collections.OrderedDict()
         self.cap = cap
         self._total = 0
@@ -188,7 +185,7 @@ class InodeCache(object):
         # "values"
         values = collections.deque(self._cache_entries.values())
 
-        while len(values) > 0:
+        while values:
             if self._total < self.cap or len(self._cache_entries) < self.min_entries:
                 break
             yield values.popleft()
@@ -302,7 +299,6 @@ class Inodes(object):
         self._inode_remove_thread.daemon = True
         self._inode_remove_thread.start()
 
-        self.cap_cache_event = threading.Event()
         self._by_uuid = collections.defaultdict(list)
 
     def __getitem__(self, item):
@@ -332,8 +328,7 @@ class Inodes(object):
 
     def cap_cache(self):
         """Notify the _inode_remove thread to recheck the cache."""
-        if not self.cap_cache_event.is_set():
-            self.cap_cache_event.set()
+        if self._inode_remove_queue.empty():
             self._inode_remove_queue.put(EvictCandidates())
 
     def update_uuid(self, entry):
@@ -388,35 +383,40 @@ class Inodes(object):
         """
 
         locked_ops = collections.deque()
-        while True:
+        shutting_down = False
+        while not shutting_down:
+            tasks_done = 0
             blocking_get = True
             while True:
                 try:
                     qentry = self._inode_remove_queue.get(blocking_get)
                 except queue.Empty:
                     break
+
                 blocking_get = False
                 if qentry is None:
-                    return
-
-                if self._shutdown_started.is_set():
+                    shutting_down = True
                     continue
 
-                # Process this entry
-                if qentry.inode_op(self, locked_ops):
-                    self._inode_remove_queue.task_done()
+                # Process (or defer) this entry
+                qentry.inode_op(self, locked_ops)
+                tasks_done += 1
 
                 # Give up the reference
                 qentry = None
 
             with llfuse.lock:
-                while len(locked_ops) > 0:
-                    if locked_ops.popleft().inode_op(self, None):
-                        self._inode_remove_queue.task_done()
-                self.cap_cache_event.clear()
+                while locked_ops:
+                    locked_ops.popleft().inode_op(self, None)
                 for entry in self.inode_cache.evict_candidates():
                     self._remove(entry)
 
+            # Unblock _inode_remove_queue.join() only when all of the
+            # deferred work is done, i.e., after calling inode_op()
+            # and then evict_candidates().
+            for _ in range(tasks_done):
+                self._inode_remove_queue.task_done()
+
     def wait_remove_queue_empty(self):
         # used by tests
         self._inode_remove_queue.join()
@@ -627,13 +627,6 @@ class Operations(llfuse.Operations):
 
         self.events = None
 
-        # We rely on the cyclic garbage collector to deallocate
-        # Collection objects from the Python SDK.  A lower GC
-        # threshold encourages Python to be more aggressive in
-        # reclaiming these and seems to slow down the growth in memory
-        # usage over time.
-        gc.set_threshold(200)
-
     def init(self):
         # Allow threads that are waiting for the driver to be finished
         # initializing to continue