"""
-from __future__ import absolute_import
-from __future__ import division
-from builtins import next
-from builtins import str
-from builtins import object
import os
import llfuse
import errno
import queue
from dataclasses import dataclass
import typing
-import gc
from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
from .fusefile import File, StringFile, FuseArvadosFile
"""
def __init__(self, cap, min_entries=4):
+ # Standard dictionaries are ordered, but OrderedDict is still better here, see
+ # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
+ # specifically we use move_to_end() which standard dicts don't have.
self._cache_entries = collections.OrderedDict()
self.cap = cap
self._total = 0
# "values"
values = collections.deque(self._cache_entries.values())
- while len(values) > 0:
+ while values:
if self._total < self.cap or len(self._cache_entries) < self.min_entries:
break
yield values.popleft()
self._inode_remove_thread.daemon = True
self._inode_remove_thread.start()
- self.cap_cache_event = threading.Event()
self._by_uuid = collections.defaultdict(list)
def __getitem__(self, item):
def cap_cache(self):
"""Notify the _inode_remove thread to recheck the cache."""
- if not self.cap_cache_event.is_set():
- self.cap_cache_event.set()
+ if self._inode_remove_queue.empty():
self._inode_remove_queue.put(EvictCandidates())
def update_uuid(self, entry):
"""
locked_ops = collections.deque()
- while True:
+ shutting_down = False
+ while not shutting_down:
+ tasks_done = 0
blocking_get = True
while True:
try:
qentry = self._inode_remove_queue.get(blocking_get)
except queue.Empty:
break
+
blocking_get = False
if qentry is None:
- return
-
- if self._shutdown_started.is_set():
+ shutting_down = True
continue
- # Process this entry
- if qentry.inode_op(self, locked_ops):
- self._inode_remove_queue.task_done()
+ # Process (or defer) this entry
+ qentry.inode_op(self, locked_ops)
+ tasks_done += 1
# Give up the reference
qentry = None
with llfuse.lock:
- while len(locked_ops) > 0:
- if locked_ops.popleft().inode_op(self, None):
- self._inode_remove_queue.task_done()
- self.cap_cache_event.clear()
+ while locked_ops:
+ locked_ops.popleft().inode_op(self, None)
for entry in self.inode_cache.evict_candidates():
self._remove(entry)
+ # Unblock _inode_remove_queue.join() only when all of the
+ # deferred work is done, i.e., after calling inode_op()
+ # and then evict_candidates().
+ for _ in range(tasks_done):
+ self._inode_remove_queue.task_done()
+
def wait_remove_queue_empty(self):
# used by tests
self._inode_remove_queue.join()
self.write_ops_counter = arvados.keep.Counter()
self.events = None
-l
- # We rely on the cyclic garbage collector to deallocate
- # Collection objects from the Python SDK. A lower GC
- # threshold encourages Python to be more aggressive in
- # reclaiming these and seems to slow down the growth in memory
- # usage over time.
- gc.set_threshold(200)
def init(self):
# Allow threads that are waiting for the driver to be finished