From f254e6a4e1593a4ade5176e8f283162fbd9af8f2 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 24 May 2024 13:20:43 -0400 Subject: [PATCH] 21660: Fix test race. The test case needs to wait for the _inode_remove_queue tasks to be completed, but _inode_remove method was calling task_done() too early in the case of tasks that are deferred until evict_candidates. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/fuse/arvados_fuse/__init__.py | 29 ++++++++++++++------------ 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index c29c2430dc..c39afa4757 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -299,7 +299,6 @@ class Inodes(object): self._inode_remove_thread.daemon = True self._inode_remove_thread.start() - self.cap_cache_event = threading.Event() self._by_uuid = collections.defaultdict(list) def __getitem__(self, item): @@ -329,8 +328,7 @@ class Inodes(object): def cap_cache(self): """Notify the _inode_remove thread to recheck the cache.""" - if not self.cap_cache_event.is_set(): - self.cap_cache_event.set() + if self._inode_remove_queue.empty(): self._inode_remove_queue.put(EvictCandidates()) def update_uuid(self, entry): @@ -385,35 +383,40 @@ class Inodes(object): """ locked_ops = collections.deque() - while True: + shutting_down = False + while not shutting_down: + tasks_done = 0 blocking_get = True while True: try: qentry = self._inode_remove_queue.get(blocking_get) except queue.Empty: break + blocking_get = False if qentry is None: - return - - if self._shutdown_started.is_set(): + shutting_down = True continue - # Process this entry - if qentry.inode_op(self, locked_ops): - self._inode_remove_queue.task_done() + # Process (or defer) this entry + qentry.inode_op(self, locked_ops) + tasks_done += 1 # Give up the reference qentry = None with llfuse.lock: while locked_ops: - if locked_ops.popleft().inode_op(self, None): - self._inode_remove_queue.task_done() - self.cap_cache_event.clear() + locked_ops.popleft().inode_op(self, None) for entry in self.inode_cache.evict_candidates(): self._remove(entry) + # Unblock _inode_remove_queue.join() only when all of the + # deferred work is done, i.e., after calling inode_op() + # and then evict_candidates(). + for _ in range(tasks_done): + self._inode_remove_queue.task_done() + def wait_remove_queue_empty(self): # used by tests self._inode_remove_queue.join() -- 2.30.2