X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cc3f89292c0136ce5d9e56506f82ea743c59fff8..08f8ec2186df6d3111fdd274e71d9e12742f5e9d:/sdk/python/arvados/keep.py?ds=sidebyside diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 1d0fc5f159..a824621079 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -233,11 +233,13 @@ class KeepBlockCache(object): self.cache_max = max(self.cache_max, 64 * 1024 * 1024) + self.cache_total = 0 if self._disk_cache: self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) + for slot in self._cache.values(): + self.cache_total += slot.size() self.cap_cache() - class CacheSlot(object): __slots__ = ("locator", "ready", "content") @@ -251,8 +253,11 @@ class KeepBlockCache(object): return self.content def set(self, value): + if self.content is not None: + return False self.content = value self.ready.set() + return True def size(self): if self.content is None: @@ -262,45 +267,25 @@ class KeepBlockCache(object): def evict(self): self.content = None - return self.gone() - def gone(self): - return (self.content is None) def _resize_cache(self, cache_max, max_slots): # Try and make sure the contents of the cache do not exceed # the supplied maximums. + if self.cache_total <= cache_max and len(self._cache) <= max_slots: + return + _evict_candidates = collections.deque(self._cache.values()) - sm = sum([slot.size() for slot in _evict_candidates]) - while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots): + while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): slot = _evict_candidates.popleft() if not slot.ready.is_set(): continue - if slot.content is None: - # error - del self._cache[slot.locator] - continue - sz = slot.size() - - # If evict returns false it means the - # underlying disk cache couldn't lock the file - # for deletion because another process was using - # it. Don't count it as reducing the amount - # of data in the cache, find something else to - # throw out. - if slot.evict(): - sm -= sz - - # check to make sure the underlying data is gone - if slot.gone(): - # either way we forget about it. either the - # other process will delete it, or if we need - # it again and it is still there, we'll find - # it on disk. - del self._cache[slot.locator] + slot.evict() + self.cache_total -= sz + del self._cache[slot.locator] def cap_cache(self): @@ -323,6 +308,7 @@ class KeepBlockCache(object): n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) if n is not None: self._cache[n.locator] = n + self.cache_total += n.size() return n return None @@ -357,7 +343,8 @@ class KeepBlockCache(object): def set(self, slot, blob): try: - slot.set(blob) + if slot.set(blob): + self.cache_total += slot.size() return except OSError as e: if e.errno == errno.ENOMEM: @@ -367,7 +354,7 @@ class KeepBlockCache(object): elif e.errno == errno.ENOSPC: # Reduce disk max space to current - 256 MiB, cap cache and retry with self._cache_lock: - sm = sum([st.size() for st in self._cache.values()]) + sm = sum(st.size() for st in self._cache.values()) self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) elif e.errno == errno.ENODEV: _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") @@ -385,7 +372,8 @@ class KeepBlockCache(object): # exception handler adjusts limits downward in some cases # to free up resources, which would make the operation # succeed. - slot.set(blob) + if slot.set(blob): + self.cache_total += slot.size() except Exception as e: # It failed again. Give up. slot.set(None) @@ -926,7 +914,10 @@ class KeepClient(object): self.misses_counter = Counter() self._storage_classes_unsupported_warning = False self._default_classes = [] - self.num_prefetch_threads = num_prefetch_threads or 2 + if num_prefetch_threads is not None: + self.num_prefetch_threads = num_prefetch_threads + else: + self.num_prefetch_threads = 2 self._prefetch_queue = None self._prefetch_threads = None