import collections
import datetime
import hashlib
+import errno
import io
import logging
import math
import ssl
import sys
import threading
+import resource
from . import timer
import urllib.parse
+import traceback
if sys.version_info >= (3, 0):
from io import BytesIO
import arvados.errors
import arvados.retry as retry
import arvados.util
+import arvados.diskcache
_logger = logging.getLogger('arvados.keep')
global_client_object = None
config.get('ARVADOS_API_TOKEN'),
config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
config.get('ARVADOS_KEEP_PROXY'),
- config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
os.environ.get('KEEP_LOCAL_STORE'))
if (global_client_object is None) or (cls._last_key != key):
global_client_object = KeepClient()
return Keep.global_client_object().put(data, **kwargs)
class KeepBlockCache(object):
- # Default RAM cache is 256MiB
- def __init__(self, cache_max=(256 * 1024 * 1024)):
+ def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
self._cache = []
self._cache_lock = threading.Lock()
+ self._max_slots = max_slots
+ self._disk_cache = disk_cache
+ self._disk_cache_dir = disk_cache_dir
+
+ if self._disk_cache and self._disk_cache_dir is None:
+ self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+ if self._max_slots == 0:
+ if self._disk_cache:
+ # default max slots to half of maximum file handles
+ # NOFILE typically defaults to 1024 on Linux so this
+ # will be 512 slots.
+ self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+ else:
+ # RAM cache slots
+ self._max_slots = 512
+
+ if self.cache_max == 0:
+ if self._disk_cache:
+ fs = os.statvfs(self._disk_cache_dir)
+ # Calculation of available space incorporates existing cache usage
+ existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+ avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+ maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+ # pick smallest of:
+ # 10% of total disk size
+ # 25% of available space
+ # max_slots * 64 MiB
+ self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+ else:
+ # 256 MiB in RAM
+ self.cache_max = (256 * 1024 * 1024)
+
+ self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+ if self._disk_cache:
+ self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ self.cap_cache()
+
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
else:
return len(self.content)
+ def evict(self):
+ return True
+
def cap_cache(self):
'''Cap the cache size to self.cache_max'''
with self._cache_lock:
# None (that means there was an error reading the block).
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and sm > self.cache_max:
+ while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
for i in range(len(self._cache)-1, -1, -1):
+ # start from the back, find a slot that is a candidate to evict
if self._cache[i].ready.is_set():
+ sz = self._cache[i].size()
+
+ # If evict returns false it means the
+ # underlying disk cache couldn't lock the file
+ # for deletion because another process was using
+ # it. Don't count it as reducing the amount
+ # of data in the cache, find something else to
+ # throw out.
+ if self._cache[i].evict():
+ sm -= sz
+
+ # either way we forget about it. either the
+ # other process will delete it, or if we need
+ # it again and it is still there, we'll find
+ # it on disk.
del self._cache[i]
break
- sm = sum([slot.size() for slot in self._cache])
def _get(self, locator):
# Test if the locator is already in the cache
del self._cache[i]
self._cache.insert(0, n)
return n
+ if self._disk_cache:
+ # see if it exists on disk
+ n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+ if n is not None:
+ self._cache.insert(0, n)
+ return n
return None
def get(self, locator):
return n, False
else:
# Add a new cache slot for the locator
- n = KeepBlockCache.CacheSlot(locator)
+ if self._disk_cache:
+ n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+ else:
+ n = KeepBlockCache.CacheSlot(locator)
self._cache.insert(0, n)
return n, True
+ def set(self, slot, blob):
+ tryagain = False
+
+ try:
+ slot.set(blob)
+ except OSError as e:
+ tryagain = True
+ if e.errno == errno.ENOMEM:
+ # Reduce max slots to current - 4, cap cache and retry
+ with self._cache_lock:
+ self._max_slots = max(4, len(self._cache) - 4)
+ elif e.errno == errno.ENOSPC:
+ # Reduce disk max space to current - 256 MiB, cap cache and retry
+ with self._cache_lock:
+ sm = sum([st.size() for st in self._cache])
+ self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+ elif e.errno == errno.ENODEV:
+ _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+ except Exception as e:
+ tryagain = True
+
+ try:
+ if tryagain:
+ # There was an error. Evict some slots and try again.
+ self.cap_cache()
+ slot.set(blob)
+ except Exception as e:
+ # It failed again. Give up.
+ raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+ finally:
+ # Set the notice that that we are done with the cache
+ # slot one way or another.
+ slot.ready.set()
+
+ self.cap_cache()
+
class Counter(object):
def __init__(self, v=0):
self._lk = threading.Lock()
return blob
finally:
if slot is not None:
- slot.set(blob)
- self.block_cache.cap_cache()
+ self.block_cache.set(slot, blob)
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?