X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/873fcf181c037cc1e42419bfeaf5bb70c9d9e239..89fc1810ba39a7a0aebccba690c7bc663bca8c0b:/services/fuse/arvados_fuse/__init__.py diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index c29c2430dc..c39afa4757 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -299,7 +299,6 @@ class Inodes(object): self._inode_remove_thread.daemon = True self._inode_remove_thread.start() - self.cap_cache_event = threading.Event() self._by_uuid = collections.defaultdict(list) def __getitem__(self, item): @@ -329,8 +328,7 @@ class Inodes(object): def cap_cache(self): """Notify the _inode_remove thread to recheck the cache.""" - if not self.cap_cache_event.is_set(): - self.cap_cache_event.set() + if self._inode_remove_queue.empty(): self._inode_remove_queue.put(EvictCandidates()) def update_uuid(self, entry): @@ -385,35 +383,40 @@ class Inodes(object): """ locked_ops = collections.deque() - while True: + shutting_down = False + while not shutting_down: + tasks_done = 0 blocking_get = True while True: try: qentry = self._inode_remove_queue.get(blocking_get) except queue.Empty: break + blocking_get = False if qentry is None: - return - - if self._shutdown_started.is_set(): + shutting_down = True continue - # Process this entry - if qentry.inode_op(self, locked_ops): - self._inode_remove_queue.task_done() + # Process (or defer) this entry + qentry.inode_op(self, locked_ops) + tasks_done += 1 # Give up the reference qentry = None with llfuse.lock: while locked_ops: - if locked_ops.popleft().inode_op(self, None): - self._inode_remove_queue.task_done() - self.cap_cache_event.clear() + locked_ops.popleft().inode_op(self, None) for entry in self.inode_cache.evict_candidates(): self._remove(entry) + # Unblock _inode_remove_queue.join() only when all of the + # deferred work is done, i.e., after calling inode_op() + # and then evict_candidates(). + for _ in range(tasks_done): + self._inode_remove_queue.task_done() + def wait_remove_queue_empty(self): # used by tests self._inode_remove_queue.join()