class KeepBlockCache(object):
def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
- self._cache = []
+ self._cache = collections.OrderedDict()
self._cache_lock = threading.Lock()
self._max_slots = max_slots
self._disk_cache = disk_cache
self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+ self.cache_total = 0
if self._disk_cache:
self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ for slot in self._cache.values():
+ self.cache_total += slot.size()
self.cap_cache()
-
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
return self.content
def set(self, value):
+ if self.content is not None:
+ return False
self.content = value
self.ready.set()
+ return True
def size(self):
if self.content is None:
def evict(self):
self.content = None
- return self.gone()
- def gone(self):
- return (self.content is None)
def _resize_cache(self, cache_max, max_slots):
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
- for i in range(len(self._cache)-1, -1, -1):
- # start from the back, find a slot that is a candidate to evict
- if self._cache[i].ready.is_set():
- sz = self._cache[i].size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if self._cache[i].evict():
- sm -= sz
-
- # check to make sure the underlying data is gone
- if self._cache[i].gone():
- # either way we forget about it. either the
- # other process will delete it, or if we need
- # it again and it is still there, we'll find
- # it on disk.
- del self._cache[i]
- break
+ if self.cache_total <= cache_max and len(self._cache) <= max_slots:
+ return
+
+ _evict_candidates = collections.deque(self._cache.values())
+ while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
+ slot = _evict_candidates.popleft()
+ if not slot.ready.is_set():
+ continue
+
+ sz = slot.size()
+ slot.evict()
+ self.cache_total -= sz
+ del self._cache[slot.locator]
def cap_cache(self):
def _get(self, locator):
# Test if the locator is already in the cache
- for i in range(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n
+ if locator in self._cache:
+ n = self._cache[locator]
+ if n.ready.is_set() and n.content is None:
+ del self._cache[n.locator]
+ return None
+ self._cache.move_to_end(locator)
+ return n
if self._disk_cache:
# see if it exists on disk
n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
if n is not None:
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
+ self.cache_total += n.size()
return n
return None
n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
else:
n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
return n, True
def set(self, slot, blob):
try:
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
return
except OSError as e:
if e.errno == errno.ENOMEM:
elif e.errno == errno.ENOSPC:
# Reduce disk max space to current - 256 MiB, cap cache and retry
with self._cache_lock:
- sm = sum([st.size() for st in self._cache])
+ sm = sum(st.size() for st in self._cache.values())
self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
elif e.errno == errno.ENODEV:
_logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
# exception handler adjusts limits downward in some cases
# to free up resources, which would make the operation
# succeed.
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
except Exception as e:
# It failed again. Give up.
slot.set(None)
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
- self.num_prefetch_threads = num_prefetch_threads or 2
+ if num_prefetch_threads is not None:
+ self.num_prefetch_threads = num_prefetch_threads
+ else:
+ self.num_prefetch_threads = 2
self._prefetch_queue = None
self._prefetch_threads = None
does not block.
"""
+ if self.block_cache.get(locator) is not None:
+ return
+
self._start_prefetch_threads()
self._prefetch_queue.put(locator)