18842: Turn disk write errors into KeepCacheError
[arvados.git] / sdk / python / arvados / keep.py
index dd99e8b928054ed03e42196608f0a9697f65870b..ce4c6f81f62d997cc5adfd37ebf4e3664ec68be7 100644 (file)
@@ -15,6 +15,7 @@ from builtins import object
 import collections
 import datetime
 import hashlib
+import errno
 import io
 import logging
 import math
@@ -29,6 +30,7 @@ import threading
 import resource
 from . import timer
 import urllib.parse
+import traceback
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -201,7 +203,9 @@ class KeepBlockCache(object):
         if self.cache_max == 0:
             if self._disk_cache:
                 fs = os.statvfs(self._disk_cache_dir)
-                avail = (fs.f_bavail * fs.f_bsize) / 4
+                # Calculation of available space incorporates existing cache usage
+                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
                 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
                 # pick smallest of:
                 # 10% of total disk size
@@ -209,7 +213,7 @@ class KeepBlockCache(object):
                 # max_slots * 64 MiB
                 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
             else:
-                # 256 GiB in RAM
+                # 256 MiB in RAM
                 self.cache_max = (256 * 1024 * 1024)
 
         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
@@ -311,6 +315,42 @@ class KeepBlockCache(object):
                 self._cache.insert(0, n)
                 return n, True
 
+    def set(self, slot, blob):
+        tryagain = False
+
+        try:
+            slot.set(blob)
+        except OSError as e:
+            tryagain = True
+            if e.errno == errno.ENOMEM:
+                # Reduce max slots to current - 4, cap cache and retry
+                with self._cache_lock:
+                    self._max_slots = max(4, len(self._cache) - 4)
+            elif e.errno == errno.ENOSPC:
+                # Reduce disk max space to current - 256 MiB, cap cache and retry
+                with self._cache_lock:
+                    sm = sum([st.size() for st in self._cache])
+                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+            elif e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+        except Exception as e:
+            tryagain = True
+
+        try:
+            if tryagain:
+                # There was an error.  Evict some slots and try again.
+                self.cap_cache()
+                slot.set(blob)
+        except Exception as e:
+            # It failed again.  Give up.
+            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+        finally:
+            # Set the notice that that we are done with the cache
+            # slot one way or another.
+            slot.ready.set()
+
+        self.cap_cache()
+
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
@@ -1236,8 +1276,7 @@ class KeepClient(object):
                 return blob
         finally:
             if slot is not None:
-                slot.set(blob)
-                self.block_cache.cap_cache()
+                self.block_cache.set(slot, blob)
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?