21660: Fix test race. 21660-inode-test-race
authorTom Clegg <tom@curii.com>
Fri, 24 May 2024 17:20:43 +0000 (13:20 -0400)
committerTom Clegg <tom@curii.com>
Fri, 24 May 2024 17:20:43 +0000 (13:20 -0400)
The test case needs to wait for the _inode_remove_queue tasks to be
completed, but _inode_remove method was calling task_done() too early
in the case of tasks that are deferred until evict_candidates.

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/fuse/arvados_fuse/__init__.py

index c29c2430dccec4570532e22950cdb42205b5d192..c39afa4757cb0ba9a5b415bf7bdb6f7a0e1d8318 100644 (file)
@@ -299,7 +299,6 @@ class Inodes(object):
         self._inode_remove_thread.daemon = True
         self._inode_remove_thread.start()
 
-        self.cap_cache_event = threading.Event()
         self._by_uuid = collections.defaultdict(list)
 
     def __getitem__(self, item):
@@ -329,8 +328,7 @@ class Inodes(object):
 
     def cap_cache(self):
         """Notify the _inode_remove thread to recheck the cache."""
-        if not self.cap_cache_event.is_set():
-            self.cap_cache_event.set()
+        if self._inode_remove_queue.empty():
             self._inode_remove_queue.put(EvictCandidates())
 
     def update_uuid(self, entry):
@@ -385,35 +383,40 @@ class Inodes(object):
         """
 
         locked_ops = collections.deque()
-        while True:
+        shutting_down = False
+        while not shutting_down:
+            tasks_done = 0
             blocking_get = True
             while True:
                 try:
                     qentry = self._inode_remove_queue.get(blocking_get)
                 except queue.Empty:
                     break
+
                 blocking_get = False
                 if qentry is None:
-                    return
-
-                if self._shutdown_started.is_set():
+                    shutting_down = True
                     continue
 
-                # Process this entry
-                if qentry.inode_op(self, locked_ops):
-                    self._inode_remove_queue.task_done()
+                # Process (or defer) this entry
+                qentry.inode_op(self, locked_ops)
+                tasks_done += 1
 
                 # Give up the reference
                 qentry = None
 
             with llfuse.lock:
                 while locked_ops:
-                    if locked_ops.popleft().inode_op(self, None):
-                        self._inode_remove_queue.task_done()
-                self.cap_cache_event.clear()
+                    locked_ops.popleft().inode_op(self, None)
                 for entry in self.inode_cache.evict_candidates():
                     self._remove(entry)
 
+            # Unblock _inode_remove_queue.join() only when all of the
+            # deferred work is done, i.e., after calling inode_op()
+            # and then evict_candidates().
+            for _ in range(tasks_done):
+                self._inode_remove_queue.task_done()
+
     def wait_remove_queue_empty(self):
         # used by tests
         self._inode_remove_queue.join()