self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+ self.cache_total = 0
if self._disk_cache:
self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ for slot in self._cache.values():
+ self.cache_total += slot.size()
self.cap_cache()
-
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
return self.content
def set(self, value):
+ if self.content is not None:
+ return False
self.content = value
self.ready.set()
+ return True
def size(self):
if self.content is None:
def evict(self):
self.content = None
- return self.gone()
- def gone(self):
- return (self.content is None)
def _resize_cache(self, cache_max, max_slots):
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
+ if self.cache_total <= cache_max and len(self._cache) <= max_slots:
+ return
+
_evict_candidates = collections.deque(self._cache.values())
- sm = sum([slot.size() for slot in _evict_candidates])
- while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+ while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
slot = _evict_candidates.popleft()
if not slot.ready.is_set():
continue
- if slot.content is None:
- # error
- del self._cache[slot.locator]
- continue
-
sz = slot.size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if slot.evict():
- sm -= sz
-
- # check to make sure the underlying data is gone
- if slot.gone():
- # either way we forget about it. either the
- # other process will delete it, or if we need
- # it again and it is still there, we'll find
- # it on disk.
- del self._cache[slot.locator]
+ slot.evict()
+ self.cache_total -= sz
+ del self._cache[slot.locator]
def cap_cache(self):
n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
if n is not None:
self._cache[n.locator] = n
+ self.cache_total += n.size()
return n
return None
def set(self, slot, blob):
try:
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
return
except OSError as e:
if e.errno == errno.ENOMEM:
elif e.errno == errno.ENOSPC:
# Reduce disk max space to current - 256 MiB, cap cache and retry
with self._cache_lock:
- sm = sum([st.size() for st in self._cache.values()])
+ sm = sum(st.size() for st in self._cache.values())
self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
elif e.errno == errno.ENODEV:
_logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
# exception handler adjusts limits downward in some cases
# to free up resources, which would make the operation
# succeed.
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
except Exception as e:
# It failed again. Give up.
slot.set(None)
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
- self.num_prefetch_threads = num_prefetch_threads or 2
+ if num_prefetch_threads is not None:
+ self.num_prefetch_threads = num_prefetch_threads
+ else:
+ self.num_prefetch_threads = 2
self._prefetch_queue = None
self._prefetch_threads = None