21639: Use a more efficient data structure for the keep block cache
authorPeter Amstutz <peter.amstutz@curii.com>
Mon, 1 Apr 2024 13:52:53 +0000 (09:52 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Mon, 1 Apr 2024 13:52:53 +0000 (09:52 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/diskcache.py
sdk/python/arvados/keep.py

index f8fca5780332e41ec1f894759b27df5c0bffd1a1..23c0a80cbfc0bd9c4f01e7f1dc152b8cbbfc65ff 100644 (file)
@@ -237,13 +237,13 @@ class DiskCacheSlot(object):
 
         # Map in all the files we found, up to maxslots, if we exceed
         # maxslots, start throwing things out.
-        cachelist = []
+        cachelist = collections.OrderedDict()
         for b in blocks:
             got = DiskCacheSlot.get_from_disk(b[0], cachedir)
             if got is None:
                 continue
             if len(cachelist) < maxslots:
-                cachelist.append(got)
+                cachelist[got.locator] = got
             else:
                 # we found more blocks than maxslots, try to
                 # throw it out of the cache.
index 4b00f7df8b912d95488d86879e3a29b83c067fec..51d40dbece2525ae8cf537c5120a0c94b45de0ba 100644 (file)
@@ -182,7 +182,7 @@ class Keep(object):
 class KeepBlockCache(object):
     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
-        self._cache = []
+        self._cache = collections.OrderedDict()
         self._cache_lock = threading.Lock()
         self._max_slots = max_slots
         self._disk_cache = disk_cache
@@ -271,33 +271,36 @@ class KeepBlockCache(object):
         # Try and make sure the contents of the cache do not exceed
         # the supplied maximums.
 
-        # Select all slots except those where ready.is_set() and content is
-        # None (that means there was an error reading the block).
-        self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-        sm = sum([slot.size() for slot in self._cache])
+        _evict_candidates = collections.deque(self._cache.values())
+        sm = sum([slot.size() for slot in _evict_candidates])
         while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
-            for i in range(len(self._cache)-1, -1, -1):
-                # start from the back, find a slot that is a candidate to evict
-                if self._cache[i].ready.is_set():
-                    sz = self._cache[i].size()
-
-                    # If evict returns false it means the
-                    # underlying disk cache couldn't lock the file
-                    # for deletion because another process was using
-                    # it. Don't count it as reducing the amount
-                    # of data in the cache, find something else to
-                    # throw out.
-                    if self._cache[i].evict():
-                        sm -= sz
-
-                    # check to make sure the underlying data is gone
-                    if self._cache[i].gone():
-                        # either way we forget about it.  either the
-                        # other process will delete it, or if we need
-                        # it again and it is still there, we'll find
-                        # it on disk.
-                        del self._cache[i]
-                    break
+            slot = _evict_candidates.popleft()
+            if not slot.ready.is_set():
+                continue
+
+            if slot.content is None:
+                # error
+                del self._cache[slot.locator]
+                continue
+
+            sz = slot.size()
+
+            # If evict returns false it means the
+            # underlying disk cache couldn't lock the file
+            # for deletion because another process was using
+            # it. Don't count it as reducing the amount
+            # of data in the cache, find something else to
+            # throw out.
+            if slot.evict():
+                sm -= sz
+
+            # check to make sure the underlying data is gone
+            if slot.gone():
+                # either way we forget about it.  either the
+                # other process will delete it, or if we need
+                # it again and it is still there, we'll find
+                # it on disk.
+                del self._cache[slot.locator]
 
 
     def cap_cache(self):
@@ -308,19 +311,15 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in range(0, len(self._cache)):
-            if self._cache[i].locator == locator:
-                n = self._cache[i]
-                if i != 0:
-                    # move it to the front
-                    del self._cache[i]
-                    self._cache.insert(0, n)
-                return n
+        if locator in self._cache:
+            n = self._cache[locator]
+            self._cache.move_to_back(locator)
+            return n
         if self._disk_cache:
             # see if it exists on disk
             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
             if n is not None:
-                self._cache.insert(0, n)
+                self._cache[n.locator] = n
                 return n
         return None
 
@@ -350,7 +349,7 @@ class KeepBlockCache(object):
                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
                 else:
                     n = KeepBlockCache.CacheSlot(locator)
-                self._cache.insert(0, n)
+                self._cache[n.locator] = n
                 return n, True
 
     def set(self, slot, blob):
@@ -365,7 +364,7 @@ class KeepBlockCache(object):
             elif e.errno == errno.ENOSPC:
                 # Reduce disk max space to current - 256 MiB, cap cache and retry
                 with self._cache_lock:
-                    sm = sum([st.size() for st in self._cache])
+                    sm = sum([st.size() for st in self._cache.values()])
                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
             elif e.errno == errno.ENODEV:
                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
@@ -1426,6 +1425,9 @@ class KeepClient(object):
         does not block.
         """
 
+        if self.block_cache.get(locator) is not None:
+            return
+
         self._start_prefetch_threads()
         self._prefetch_queue.put(locator)