18842: Respond to review comments
[arvados.git] / sdk / python / arvados / diskcache.py
index 6f1ccb97e577cca3140867652161d06535a15ca7..9734d93a7742d6bf395b4fe2dbed2e907ab50a4d 100644 (file)
@@ -9,6 +9,12 @@ import traceback
 import stat
 import tempfile
 import fcntl
+import errno
+import logging
+
+_logger = logging.getLogger('arvados.keep')
+
+cacheblock_suffix = ".keepcacheblock"
 
 class DiskCacheSlot(object):
     __slots__ = ("locator", "ready", "content", "cachedir")
@@ -24,6 +30,7 @@ class DiskCacheSlot(object):
         return self.content
 
     def set(self, value):
+        tmpfile = None
         try:
             if value is None:
                 self.content = None
@@ -41,9 +48,9 @@ class DiskCacheSlot(object):
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             os.makedirs(blockdir, mode=0o700, exist_ok=True)
 
-            final = os.path.join(blockdir, self.locator)
+            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
 
-            f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
+            f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False, prefix="tmp", suffix=cacheblock_suffix)
             tmpfile = f.name
             os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
 
@@ -54,11 +61,32 @@ class DiskCacheSlot(object):
             f.write(value)
             f.flush()
             os.rename(tmpfile, final)
+            tmpfile = None
 
             self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+        except OSError as e:
+            if e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+            elif e.errno == errno.ENOMEM:
+                _logger.error("Unable to use disk cache: The process's maximum number of mappings would have been exceeded.")
+            elif e.errno == errno.ENOSPC:
+                _logger.error("Unable to use disk cache: Out of disk space.")
+            else:
+                traceback.print_exc()
         except Exception as e:
             traceback.print_exc()
         finally:
+            if tmpfile is not None:
+                # If the tempfile hasn't been renamed on disk yet, try to delete it.
+                try:
+                    os.remove(tmpfile)
+                except:
+                    pass
+            if self.content is None:
+                # Something went wrong with the disk cache, fall back
+                # to RAM cache behavior (the alternative is to cache
+                # nothing and return a read error).
+                self.content = value
             self.ready.set()
 
     def size(self):
@@ -89,11 +117,12 @@ class DiskCacheSlot(object):
             # gone.
 
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
-            final = os.path.join(blockdir, self.locator)
+            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
             try:
                 with open(final, "rb") as f:
-                    # unlock,
+                    # unlock
                     fcntl.flock(f, fcntl.LOCK_UN)
+                    self.content = None
 
                     # try to get an exclusive lock, this ensures other
                     # processes are not using the block.  It is
@@ -125,7 +154,7 @@ class DiskCacheSlot(object):
     @staticmethod
     def get_from_disk(locator, cachedir):
         blockdir = os.path.join(cachedir, locator[0:3])
-        final = os.path.join(blockdir, locator)
+        final = os.path.join(blockdir, locator) + cacheblock_suffix
 
         try:
             filehandle = open(final, "rb")
@@ -156,9 +185,21 @@ class DiskCacheSlot(object):
         blocks = []
         for root, dirs, files in os.walk(cachedir):
             for name in files:
+                if not name.endswith(cacheblock_suffix):
+                    continue
+
                 blockpath = os.path.join(root, name)
                 res = os.stat(blockpath)
-                blocks.append((name, res.st_atime))
+
+                if len(name) == (32+len(cacheblock_suffix)) and not name.startswith("tmp"):
+                    blocks.append((name[0:32], res.st_atime))
+                elif name.startswith("tmp") and ((time.time() - res.st_mtime) > 60):
+                    # found a temporary file more than 1 minute old,
+                    # try to delete it.
+                    try:
+                        os.remove(blockpath)
+                    except:
+                        pass
 
         # sort by access time (atime), going from most recently
         # accessed (highest timestamp) to least recently accessed