18842: Add locking and cachedir cleanup, needs testing
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 19 Oct 2022 21:18:20 +0000 (17:18 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Wed, 19 Oct 2022 21:18:20 +0000 (17:18 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/diskcache.py

index 24f249f1d0fde09edb64bd0c1fb8b10ab175bad5..9f218e56ce9ab6e3f3fc363d56fbc141b987249d 100644 (file)
@@ -9,6 +9,7 @@ import traceback
 import stat
 import tempfile
 import hashlib
+import fcntl
 
 class DiskCacheSlot(object):
     __slots__ = ("locator", "ready", "content", "cachedir")
@@ -46,6 +47,11 @@ class DiskCacheSlot(object):
             f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
             tmpfile = f.name
             os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
+
+            # aquire a shared lock, this tells other processes that
+            # we're using this block and to please not delete it.
+            fcntl.flock(f, fcntl.LOCK_SH)
+
             f.write(value)
             f.flush()
             os.rename(tmpfile, final)
@@ -86,6 +92,11 @@ class DiskCacheSlot(object):
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             final = os.path.join(blockdir, self.locator)
             try:
+                # If we can't upgrade our shared lock to an exclusive
+                # lock, it'll throw an error, that's fine and
+                # desirable, it means another process has a lock and
+                # we shouldn't delete the block.
+                fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
                 os.remove(final)
             except OSError:
                 pass
@@ -98,6 +109,11 @@ class DiskCacheSlot(object):
 
         try:
             filehandle = open(final, "rb")
+
+            # aquire a shared lock, this tells other processes that
+            # we're using this block and to please not delete it.
+            fcntl.flock(f, fcntl.LOCK_SH)
+
             content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
             disk_md5 = hashlib.md5(content).hexdigest()
             if disk_md5 == locator:
@@ -111,3 +127,38 @@ class DiskCacheSlot(object):
             traceback.print_exc()
 
         return None
+
+    @staticmethod
+    def cleanup_cachedir(cachedir, maxsize):
+        blocks = []
+        totalsize = 0
+        for root, dirs, files in os.walk(cachedir):
+            for name in files:
+                blockpath = os.path.join(root, name)
+                res = os.stat(blockpath)
+                blocks.append((blockpath, res.st_size, res.st_atime))
+                totalsize += res.st_size
+
+        if totalsize <= maxsize:
+            return
+
+        # sort by atime, so the blocks accessed the longest time in
+        # the past get deleted first.
+        blocks.sort(key=lambda x: x[2])
+
+        # go through the list and try deleting blocks until we're
+        # below the target size and/or we run out of blocks
+        i = 0
+        while i < len(blocks) and totalsize > maxsize:
+            try:
+                with open(blocks[i][0], "rb") as f:
+                    # If we can't get an exclusive lock, it'll
+                    # throw an error, that's fine and desirable,
+                    # it means another process has a lock and we
+                    # shouldn't delete the block.
+                    fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+                    os.remove(block)
+                    totalsize -= blocks[i][1]
+            except OSError:
+                pass
+            i += 1