18842: When starting up the disk cache, map in everything
authorPeter Amstutz <peter.amstutz@curii.com>
Thu, 20 Oct 2022 20:18:00 +0000 (16:18 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Thu, 20 Oct 2022 20:18:00 +0000 (16:18 -0400)
When a process starts, the first thing it will do is map in the
existing blocks and prune any excess from the cache.

If there are multiple processes using the same cache dir, at worst
you'd end up with N*M usage where N is the number of processes and M
is the cache limit.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/diskcache.py
sdk/python/arvados/keep.py
sdk/python/tests/arvados_testutil.py
services/fuse/tests/mount_test_base.py

index 9f218e56ce9ab6e3f3fc363d56fbc141b987249d..c2afd3bfc310eda1c18392c48f17a1c5be45ac20 100644 (file)
@@ -8,8 +8,6 @@ import os
 import traceback
 import stat
 import tempfile
-import hashlib
-import fcntl
 
 class DiskCacheSlot(object):
     __slots__ = ("locator", "ready", "content", "cachedir")
@@ -48,10 +46,6 @@ class DiskCacheSlot(object):
             tmpfile = f.name
             os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
 
-            # aquire a shared lock, this tells other processes that
-            # we're using this block and to please not delete it.
-            fcntl.flock(f, fcntl.LOCK_SH)
-
             f.write(value)
             f.flush()
             os.rename(tmpfile, final)
@@ -92,35 +86,23 @@ class DiskCacheSlot(object):
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             final = os.path.join(blockdir, self.locator)
             try:
-                # If we can't upgrade our shared lock to an exclusive
-                # lock, it'll throw an error, that's fine and
-                # desirable, it means another process has a lock and
-                # we shouldn't delete the block.
-                fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
                 os.remove(final)
             except OSError:
                 pass
 
     @staticmethod
     def get_from_disk(locator, cachedir):
-        # Get it, check it, return it
         blockdir = os.path.join(cachedir, locator[0:3])
         final = os.path.join(blockdir, locator)
 
         try:
             filehandle = open(final, "rb")
 
-            # aquire a shared lock, this tells other processes that
-            # we're using this block and to please not delete it.
-            fcntl.flock(f, fcntl.LOCK_SH)
-
             content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
-            disk_md5 = hashlib.md5(content).hexdigest()
-            if disk_md5 == locator:
-                dc = DiskCacheSlot(locator, cachedir)
-                dc.content = content
-                dc.ready.set()
-                return dc
+            dc = DiskCacheSlot(locator, cachedir)
+            dc.content = content
+            dc.ready.set()
+            return dc
         except FileNotFoundError:
             pass
         except Exception as e:
@@ -129,36 +111,36 @@ class DiskCacheSlot(object):
         return None
 
     @staticmethod
-    def cleanup_cachedir(cachedir, maxsize):
+    def init_cache(cachedir, maxslots):
+        # map in all the files in the cache directory, up to max slots.
+        # after max slots, try to delete the excess blocks.
+        #
+        # this gives the calling process ownership of all the blocks
+
         blocks = []
-        totalsize = 0
         for root, dirs, files in os.walk(cachedir):
             for name in files:
                 blockpath = os.path.join(root, name)
                 res = os.stat(blockpath)
-                blocks.append((blockpath, res.st_size, res.st_atime))
-                totalsize += res.st_size
-
-        if totalsize <= maxsize:
-            return
-
-        # sort by atime, so the blocks accessed the longest time in
-        # the past get deleted first.
-        blocks.sort(key=lambda x: x[2])
-
-        # go through the list and try deleting blocks until we're
-        # below the target size and/or we run out of blocks
-        i = 0
-        while i < len(blocks) and totalsize > maxsize:
-            try:
-                with open(blocks[i][0], "rb") as f:
-                    # If we can't get an exclusive lock, it'll
-                    # throw an error, that's fine and desirable,
-                    # it means another process has a lock and we
-                    # shouldn't delete the block.
-                    fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
-                    os.remove(block)
-                    totalsize -= blocks[i][1]
-            except OSError:
-                pass
-            i += 1
+                blocks.append((name, res.st_atime))
+
+        # sort by access time (atime), going from most recently
+        # accessed (highest timestamp) to least recently accessed
+        # (lowest timestamp).
+        blocks.sort(key=lambda x: x[1], reverse=True)
+
+        # Map in all the files we found, up to maxslots, if we exceed
+        # maxslots, start throwing things out.
+        cachelist = []
+        for b in blocks:
+            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
+            if got is None:
+                continue
+            if len(cachelist) < maxslots:
+                cachelist.append(got)
+            else:
+                # we found more blocks than maxslots, try to
+                # throw it out of the cache.
+                got.evict()
+
+        return cachelist
index 7f316c153cad1f34dab4df84814ab49bb2c14ac7..8d95b2dc714e696a6b2acbf73d0291a8cd1153cf 100644 (file)
@@ -207,6 +207,11 @@ class KeepBlockCache(object):
 
         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            self.cap_cache()
+
+
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
 
index a574508cbdcca567692a4120aeb3d6da60c72e4d..3772761b88a2b9ef6be3b7b4ec724936f4e36d5a 100644 (file)
@@ -290,8 +290,8 @@ def binary_compare(a, b):
     return True
 
 def make_block_cache(disk_cache):
-    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     if disk_cache:
         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
         shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     return block_cache
index a2400506446c4735af81f3d0f22da436493232e1..b1383d36bbbc7cffafb5819b1388d273542aa3ce 100644 (file)
@@ -26,10 +26,10 @@ logger = logging.getLogger('arvados.arv-mount')
 from .integration_test import workerPool
 
 def make_block_cache(disk_cache):
-    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     if disk_cache:
         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
         shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     return block_cache
 
 class MountTestBase(unittest.TestCase):