projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '21224-project-details'
[arvados.git]
/
services
/
fuse
/
arvados_fuse
/
__init__.py
diff --git
a/services/fuse/arvados_fuse/__init__.py
b/services/fuse/arvados_fuse/__init__.py
index d827aefab70a3292780799721766c6fea002c52e..c39afa4757cb0ba9a5b415bf7bdb6f7a0e1d8318 100644
(file)
--- a/
services/fuse/arvados_fuse/__init__.py
+++ b/
services/fuse/arvados_fuse/__init__.py
@@
-54,11
+54,6
@@
inode assigned to it and appears in the Inodes._entries dictionary.
"""
"""
-from __future__ import absolute_import
-from __future__ import division
-from builtins import next
-from builtins import str
-from builtins import object
import os
import llfuse
import errno
import os
import llfuse
import errno
@@
-304,7
+299,6
@@
class Inodes(object):
self._inode_remove_thread.daemon = True
self._inode_remove_thread.start()
self._inode_remove_thread.daemon = True
self._inode_remove_thread.start()
- self.cap_cache_event = threading.Event()
self._by_uuid = collections.defaultdict(list)
def __getitem__(self, item):
self._by_uuid = collections.defaultdict(list)
def __getitem__(self, item):
@@
-334,8
+328,7
@@
class Inodes(object):
def cap_cache(self):
"""Notify the _inode_remove thread to recheck the cache."""
def cap_cache(self):
"""Notify the _inode_remove thread to recheck the cache."""
- if not self.cap_cache_event.is_set():
- self.cap_cache_event.set()
+ if self._inode_remove_queue.empty():
self._inode_remove_queue.put(EvictCandidates())
def update_uuid(self, entry):
self._inode_remove_queue.put(EvictCandidates())
def update_uuid(self, entry):
@@
-390,35
+383,40
@@
class Inodes(object):
"""
locked_ops = collections.deque()
"""
locked_ops = collections.deque()
- while True:
+ shutting_down = False
+ while not shutting_down:
+ tasks_done = 0
blocking_get = True
while True:
try:
qentry = self._inode_remove_queue.get(blocking_get)
except queue.Empty:
break
blocking_get = True
while True:
try:
qentry = self._inode_remove_queue.get(blocking_get)
except queue.Empty:
break
+
blocking_get = False
if qentry is None:
blocking_get = False
if qentry is None:
- return
-
- if self._shutdown_started.is_set():
+ shutting_down = True
continue
continue
- # Process this entry
- if qentry.inode_op(self, locked_ops):
- self._inode_remove_queue.task_done()
+ # Process
(or defer)
this entry
+ qentry.inode_op(self, locked_ops)
+ tasks_done += 1
# Give up the reference
qentry = None
with llfuse.lock:
while locked_ops:
# Give up the reference
qentry = None
with llfuse.lock:
while locked_ops:
- if locked_ops.popleft().inode_op(self, None):
- self._inode_remove_queue.task_done()
- self.cap_cache_event.clear()
+ locked_ops.popleft().inode_op(self, None)
for entry in self.inode_cache.evict_candidates():
self._remove(entry)
for entry in self.inode_cache.evict_candidates():
self._remove(entry)
+ # Unblock _inode_remove_queue.join() only when all of the
+ # deferred work is done, i.e., after calling inode_op()
+ # and then evict_candidates().
+ for _ in range(tasks_done):
+ self._inode_remove_queue.task_done()
+
def wait_remove_queue_empty(self):
# used by tests
self._inode_remove_queue.join()
def wait_remove_queue_empty(self):
# used by tests
self._inode_remove_queue.join()