self._inode_remove_thread.daemon = True
self._inode_remove_thread.start()
- self.cap_cache_event = threading.Event()
self._by_uuid = collections.defaultdict(list)
def __getitem__(self, item):
def cap_cache(self):
"""Notify the _inode_remove thread to recheck the cache."""
- if not self.cap_cache_event.is_set():
- self.cap_cache_event.set()
+ if self._inode_remove_queue.empty():
self._inode_remove_queue.put(EvictCandidates())
def update_uuid(self, entry):
"""
locked_ops = collections.deque()
- while True:
+ shutting_down = False
+ while not shutting_down:
+ tasks_done = 0
blocking_get = True
while True:
try:
qentry = self._inode_remove_queue.get(blocking_get)
except queue.Empty:
break
+
blocking_get = False
if qentry is None:
- return
-
- if self._shutdown_started.is_set():
+ shutting_down = True
continue
- # Process this entry
- if qentry.inode_op(self, locked_ops):
- self._inode_remove_queue.task_done()
+ # Process (or defer) this entry
+ qentry.inode_op(self, locked_ops)
+ tasks_done += 1
# Give up the reference
qentry = None
with llfuse.lock:
while locked_ops:
- if locked_ops.popleft().inode_op(self, None):
- self._inode_remove_queue.task_done()
- self.cap_cache_event.clear()
+ locked_ops.popleft().inode_op(self, None)
for entry in self.inode_cache.evict_candidates():
self._remove(entry)
+ # Unblock _inode_remove_queue.join() only when all of the
+ # deferred work is done, i.e., after calling inode_op()
+ # and then evict_candidates().
+ for _ in range(tasks_done):
+ self._inode_remove_queue.task_done()
+
def wait_remove_queue_empty(self):
# used by tests
self._inode_remove_queue.join()