class KeepBlockCache(object):
def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
- self._cache = []
+ self._cache = collections.OrderedDict()
self._cache_lock = threading.Lock()
self._max_slots = max_slots
self._disk_cache = disk_cache
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
+ _evict_candidates = collections.deque(self._cache.values())
+ sm = sum([slot.size() for slot in _evict_candidates])
while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
- for i in range(len(self._cache)-1, -1, -1):
- # start from the back, find a slot that is a candidate to evict
- if self._cache[i].ready.is_set():
- sz = self._cache[i].size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if self._cache[i].evict():
- sm -= sz
-
- # check to make sure the underlying data is gone
- if self._cache[i].gone():
- # either way we forget about it. either the
- # other process will delete it, or if we need
- # it again and it is still there, we'll find
- # it on disk.
- del self._cache[i]
- break
+ slot = _evict_candidates.popleft()
+ if not slot.ready.is_set():
+ continue
+
+ if slot.content is None:
+ # error
+ del self._cache[slot.locator]
+ continue
+
+ sz = slot.size()
+
+ # If evict returns false it means the
+ # underlying disk cache couldn't lock the file
+ # for deletion because another process was using
+ # it. Don't count it as reducing the amount
+ # of data in the cache, find something else to
+ # throw out.
+ if slot.evict():
+ sm -= sz
+
+ # check to make sure the underlying data is gone
+ if slot.gone():
+ # either way we forget about it. either the
+ # other process will delete it, or if we need
+ # it again and it is still there, we'll find
+ # it on disk.
+ del self._cache[slot.locator]
def cap_cache(self):
def _get(self, locator):
# Test if the locator is already in the cache
- for i in range(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n
+ if locator in self._cache:
+ n = self._cache[locator]
+ self._cache.move_to_back(locator)
+ return n
if self._disk_cache:
# see if it exists on disk
n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
if n is not None:
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
return n
return None
n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
else:
n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
return n, True
def set(self, slot, blob):
elif e.errno == errno.ENOSPC:
# Reduce disk max space to current - 256 MiB, cap cache and retry
with self._cache_lock:
- sm = sum([st.size() for st in self._cache])
+ sm = sum([st.size() for st in self._cache.values()])
self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
elif e.errno == errno.ENODEV:
_logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
does not block.
"""
+ if self.block_cache.get(locator) is not None:
+ return
+
self._start_prefetch_threads()
self._prefetch_queue.put(locator)