From 062542e47eb3bbb1ad911f2bcb6e51967f80db86 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 1 Apr 2024 09:52:53 -0400 Subject: [PATCH] 21639: Use a more efficient data structure for the keep block cache Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/python/arvados/diskcache.py | 4 +- sdk/python/arvados/keep.py | 78 +++++++++++++++++---------------- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py index f8fca57803..23c0a80cbf 100644 --- a/sdk/python/arvados/diskcache.py +++ b/sdk/python/arvados/diskcache.py @@ -237,13 +237,13 @@ class DiskCacheSlot(object): # Map in all the files we found, up to maxslots, if we exceed # maxslots, start throwing things out. - cachelist = [] + cachelist = collections.OrderedDict() for b in blocks: got = DiskCacheSlot.get_from_disk(b[0], cachedir) if got is None: continue if len(cachelist) < maxslots: - cachelist.append(got) + cachelist[got.locator] = got else: # we found more blocks than maxslots, try to # throw it out of the cache. diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 4b00f7df8b..51d40dbece 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -182,7 +182,7 @@ class Keep(object): class KeepBlockCache(object): def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): self.cache_max = cache_max - self._cache = [] + self._cache = collections.OrderedDict() self._cache_lock = threading.Lock() self._max_slots = max_slots self._disk_cache = disk_cache @@ -271,33 +271,36 @@ class KeepBlockCache(object): # Try and make sure the contents of the cache do not exceed # the supplied maximums. - # Select all slots except those where ready.is_set() and content is - # None (that means there was an error reading the block). - self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)] - sm = sum([slot.size() for slot in self._cache]) + _evict_candidates = collections.deque(self._cache.values()) + sm = sum([slot.size() for slot in _evict_candidates]) while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots): - for i in range(len(self._cache)-1, -1, -1): - # start from the back, find a slot that is a candidate to evict - if self._cache[i].ready.is_set(): - sz = self._cache[i].size() - - # If evict returns false it means the - # underlying disk cache couldn't lock the file - # for deletion because another process was using - # it. Don't count it as reducing the amount - # of data in the cache, find something else to - # throw out. - if self._cache[i].evict(): - sm -= sz - - # check to make sure the underlying data is gone - if self._cache[i].gone(): - # either way we forget about it. either the - # other process will delete it, or if we need - # it again and it is still there, we'll find - # it on disk. - del self._cache[i] - break + slot = _evict_candidates.popleft() + if not slot.ready.is_set(): + continue + + if slot.content is None: + # error + del self._cache[slot.locator] + continue + + sz = slot.size() + + # If evict returns false it means the + # underlying disk cache couldn't lock the file + # for deletion because another process was using + # it. Don't count it as reducing the amount + # of data in the cache, find something else to + # throw out. + if slot.evict(): + sm -= sz + + # check to make sure the underlying data is gone + if slot.gone(): + # either way we forget about it. either the + # other process will delete it, or if we need + # it again and it is still there, we'll find + # it on disk. + del self._cache[slot.locator] def cap_cache(self): @@ -308,19 +311,15 @@ class KeepBlockCache(object): def _get(self, locator): # Test if the locator is already in the cache - for i in range(0, len(self._cache)): - if self._cache[i].locator == locator: - n = self._cache[i] - if i != 0: - # move it to the front - del self._cache[i] - self._cache.insert(0, n) - return n + if locator in self._cache: + n = self._cache[locator] + self._cache.move_to_back(locator) + return n if self._disk_cache: # see if it exists on disk n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) if n is not None: - self._cache.insert(0, n) + self._cache[n.locator] = n return n return None @@ -350,7 +349,7 @@ class KeepBlockCache(object): n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir) else: n = KeepBlockCache.CacheSlot(locator) - self._cache.insert(0, n) + self._cache[n.locator] = n return n, True def set(self, slot, blob): @@ -365,7 +364,7 @@ class KeepBlockCache(object): elif e.errno == errno.ENOSPC: # Reduce disk max space to current - 256 MiB, cap cache and retry with self._cache_lock: - sm = sum([st.size() for st in self._cache]) + sm = sum([st.size() for st in self._cache.values()]) self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) elif e.errno == errno.ENODEV: _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") @@ -1426,6 +1425,9 @@ class KeepClient(object): does not block. """ + if self.block_cache.get(locator) is not None: + return + self._start_prefetch_threads() self._prefetch_queue.put(locator) -- 2.30.2