18799: Move canonical discovery document to sdk/python
[arvados.git] / sdk / python / arvados / keep.py
index 44e915776734fe87020ba46b5d95d9985f8e8dfe..cbe96ffa2f1c8d038e95f6174b3dc44ec739770a 100644 (file)
@@ -15,6 +15,7 @@ from builtins import object
 import collections
 import datetime
 import hashlib
+import errno
 import io
 import logging
 import math
@@ -26,8 +27,11 @@ import socket
 import ssl
 import sys
 import threading
+import resource
 from . import timer
 import urllib.parse
+import traceback
+import weakref
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -39,6 +43,7 @@ import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
+import arvados.diskcache
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -174,11 +179,63 @@ class Keep(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
-    # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
+        self._max_slots = max_slots
+        self._disk_cache = disk_cache
+        self._disk_cache_dir = disk_cache_dir
+        self._cache_updating = threading.Condition(self._cache_lock)
+
+        if self._disk_cache and self._disk_cache_dir is None:
+            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+        if self._max_slots == 0:
+            if self._disk_cache:
+                # Each block uses two file descriptors, one used to
+                # open it initially and hold the flock(), and a second
+                # hidden one used by mmap().
+                #
+                # Set max slots to 1/8 of maximum file handles.  This
+                # means we'll use at most 1/4 of total file handles.
+                #
+                # NOFILE typically defaults to 1024 on Linux so this
+                # is 128 slots (256 file handles), which means we can
+                # cache up to 8 GiB of 64 MiB blocks.  This leaves
+                # 768 file handles for sockets and other stuff.
+                #
+                # When we want the ability to have more cache (e.g. in
+                # arv-mount) we'll increase rlimit before calling
+                # this.
+                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
+            else:
+                # RAM cache slots
+                self._max_slots = 512
+
+        if self.cache_max == 0:
+            if self._disk_cache:
+                fs = os.statvfs(self._disk_cache_dir)
+                # Calculation of available space incorporates existing cache usage
+                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+                # pick smallest of:
+                # 10% of total disk size
+                # 25% of available space
+                # max_slots * 64 MiB
+                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+            else:
+                # 256 MiB in RAM
+                self.cache_max = (256 * 1024 * 1024)
+
+        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            self.cap_cache()
+
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
@@ -202,19 +259,51 @@ class KeepBlockCache(object):
             else:
                 return len(self.content)
 
+        def evict(self):
+            self.content = None
+            return self.gone()
+
+        def gone(self):
+            return (self.content is None)
+
+    def _resize_cache(self, cache_max, max_slots):
+        # Try and make sure the contents of the cache do not exceed
+        # the supplied maximums.
+
+        # Select all slots except those where ready.is_set() and content is
+        # None (that means there was an error reading the block).
+        self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+        sm = sum([slot.size() for slot in self._cache])
+        while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+            for i in range(len(self._cache)-1, -1, -1):
+                # start from the back, find a slot that is a candidate to evict
+                if self._cache[i].ready.is_set():
+                    sz = self._cache[i].size()
+
+                    # If evict returns false it means the
+                    # underlying disk cache couldn't lock the file
+                    # for deletion because another process was using
+                    # it. Don't count it as reducing the amount
+                    # of data in the cache, find something else to
+                    # throw out.
+                    if self._cache[i].evict():
+                        sm -= sz
+
+                    # check to make sure the underlying data is gone
+                    if self._cache[i].gone():
+                        # either way we forget about it.  either the
+                        # other process will delete it, or if we need
+                        # it again and it is still there, we'll find
+                        # it on disk.
+                        del self._cache[i]
+                    break
+
+
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
-        with self._cache_lock:
-            # Select all slots except those where ready.is_set() and content is
-            # None (that means there was an error reading the block).
-            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-            sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and sm > self.cache_max:
-                for i in range(len(self._cache)-1, -1, -1):
-                    if self._cache[i].ready.is_set():
-                        del self._cache[i]
-                        break
-                sm = sum([slot.size() for slot in self._cache])
+        with self._cache_updating:
+            self._resize_cache(self.cache_max, self._max_slots)
+            self._cache_updating.notify_all()
 
     def _get(self, locator):
         # Test if the locator is already in the cache
@@ -226,6 +315,12 @@ class KeepBlockCache(object):
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
+        if self._disk_cache:
+            # see if it exists on disk
+            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+            if n is not None:
+                self._cache.insert(0, n)
+                return n
         return None
 
     def get(self, locator):
@@ -235,16 +330,66 @@ class KeepBlockCache(object):
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        with self._cache_lock:
+        with self._cache_updating:
             n = self._get(locator)
             if n:
                 return n, False
             else:
                 # Add a new cache slot for the locator
-                n = KeepBlockCache.CacheSlot(locator)
+                self._resize_cache(self.cache_max, self._max_slots-1)
+                while len(self._cache) >= self._max_slots:
+                    # If there isn't a slot available, need to wait
+                    # for something to happen that releases one of the
+                    # cache slots.  Idle for 200 ms or woken up by
+                    # another thread
+                    self._cache_updating.wait(timeout=0.2)
+                    self._resize_cache(self.cache_max, self._max_slots-1)
+
+                if self._disk_cache:
+                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+                else:
+                    n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
 
+    def set(self, slot, blob):
+        try:
+            slot.set(blob)
+            return
+        except OSError as e:
+            if e.errno == errno.ENOMEM:
+                # Reduce max slots to current - 4, cap cache and retry
+                with self._cache_lock:
+                    self._max_slots = max(4, len(self._cache) - 4)
+            elif e.errno == errno.ENOSPC:
+                # Reduce disk max space to current - 256 MiB, cap cache and retry
+                with self._cache_lock:
+                    sm = sum([st.size() for st in self._cache])
+                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+            elif e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+        except Exception as e:
+            pass
+        finally:
+            # Check if we should evict things from the cache.  Either
+            # because we added a new thing or there was an error and
+            # we possibly adjusted the limits down, so we might need
+            # to push something out.
+            self.cap_cache()
+
+        try:
+            # Only gets here if there was an error the first time. The
+            # exception handler adjusts limits downward in some cases
+            # to free up resources, which would make the operation
+            # succeed.
+            slot.set(blob)
+        except Exception as e:
+            # It failed again.  Give up.
+            slot.set(None)
+            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+
+        self.cap_cache()
+
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
@@ -1170,8 +1315,7 @@ class KeepClient(object):
                 return blob
         finally:
             if slot is not None:
-                slot.set(blob)
-                self.block_cache.cap_cache()
+                self.block_cache.set(slot, blob)
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?