import collections
import datetime
import hashlib
+import errno
import io
import logging
import math
import ssl
import sys
import threading
+import resource
from . import timer
import urllib.parse
+import traceback
+import weakref
if sys.version_info >= (3, 0):
from io import BytesIO
import arvados.errors
import arvados.retry as retry
import arvados.util
+import arvados.diskcache
+from arvados._pycurlhelper import PyCurlHelper
_logger = logging.getLogger('arvados.keep')
global_client_object = None
config.get('ARVADOS_API_TOKEN'),
config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
config.get('ARVADOS_KEEP_PROXY'),
- config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
os.environ.get('KEEP_LOCAL_STORE'))
if (global_client_object is None) or (cls._last_key != key):
global_client_object = KeepClient()
return Keep.global_client_object().put(data, **kwargs)
class KeepBlockCache(object):
- # Default RAM cache is 256MiB
- def __init__(self, cache_max=(256 * 1024 * 1024)):
+ def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
self._cache = []
self._cache_lock = threading.Lock()
+ self._max_slots = max_slots
+ self._disk_cache = disk_cache
+ self._disk_cache_dir = disk_cache_dir
+ self._cache_updating = threading.Condition(self._cache_lock)
+
+ if self._disk_cache and self._disk_cache_dir is None:
+ self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+ if self._max_slots == 0:
+ if self._disk_cache:
+ # Each block uses two file descriptors, one used to
+ # open it initially and hold the flock(), and a second
+ # hidden one used by mmap().
+ #
+ # Set max slots to 1/8 of maximum file handles. This
+ # means we'll use at most 1/4 of total file handles.
+ #
+ # NOFILE typically defaults to 1024 on Linux so this
+ # is 128 slots (256 file handles), which means we can
+ # cache up to 8 GiB of 64 MiB blocks. This leaves
+ # 768 file handles for sockets and other stuff.
+ #
+ # When we want the ability to have more cache (e.g. in
+ # arv-mount) we'll increase rlimit before calling
+ # this.
+ self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
+ else:
+ # RAM cache slots
+ self._max_slots = 512
+
+ if self.cache_max == 0:
+ if self._disk_cache:
+ fs = os.statvfs(self._disk_cache_dir)
+ # Calculation of available space incorporates existing cache usage
+ existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+ avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+ maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+ # pick smallest of:
+ # 10% of total disk size
+ # 25% of available space
+ # max_slots * 64 MiB
+ self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+ else:
+ # 256 MiB in RAM
+ self.cache_max = (256 * 1024 * 1024)
+
+ self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+ if self._disk_cache:
+ self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ self.cap_cache()
+
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
else:
return len(self.content)
+ def evict(self):
+ self.content = None
+ return self.gone()
+
+ def gone(self):
+ return (self.content is None)
+
+ def _resize_cache(self, cache_max, max_slots):
+ # Try and make sure the contents of the cache do not exceed
+ # the supplied maximums.
+
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+ for i in range(len(self._cache)-1, -1, -1):
+ # start from the back, find a slot that is a candidate to evict
+ if self._cache[i].ready.is_set():
+ sz = self._cache[i].size()
+
+ # If evict returns false it means the
+ # underlying disk cache couldn't lock the file
+ # for deletion because another process was using
+ # it. Don't count it as reducing the amount
+ # of data in the cache, find something else to
+ # throw out.
+ if self._cache[i].evict():
+ sm -= sz
+
+ # check to make sure the underlying data is gone
+ if self._cache[i].gone():
+ # either way we forget about it. either the
+ # other process will delete it, or if we need
+ # it again and it is still there, we'll find
+ # it on disk.
+ del self._cache[i]
+ break
+
+
def cap_cache(self):
'''Cap the cache size to self.cache_max'''
- with self._cache_lock:
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and sm > self.cache_max:
- for i in range(len(self._cache)-1, -1, -1):
- if self._cache[i].ready.is_set():
- del self._cache[i]
- break
- sm = sum([slot.size() for slot in self._cache])
+ with self._cache_updating:
+ self._resize_cache(self.cache_max, self._max_slots)
+ self._cache_updating.notify_all()
def _get(self, locator):
# Test if the locator is already in the cache
del self._cache[i]
self._cache.insert(0, n)
return n
+ if self._disk_cache:
+ # see if it exists on disk
+ n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+ if n is not None:
+ self._cache.insert(0, n)
+ return n
return None
def get(self, locator):
def reserve_cache(self, locator):
'''Reserve a cache slot for the specified locator,
or return the existing slot.'''
- with self._cache_lock:
+ with self._cache_updating:
n = self._get(locator)
if n:
return n, False
else:
# Add a new cache slot for the locator
- n = KeepBlockCache.CacheSlot(locator)
+ self._resize_cache(self.cache_max, self._max_slots-1)
+ while len(self._cache) >= self._max_slots:
+ # If there isn't a slot available, need to wait
+ # for something to happen that releases one of the
+ # cache slots. Idle for 200 ms or woken up by
+ # another thread
+ self._cache_updating.wait(timeout=0.2)
+ self._resize_cache(self.cache_max, self._max_slots-1)
+
+ if self._disk_cache:
+ n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+ else:
+ n = KeepBlockCache.CacheSlot(locator)
self._cache.insert(0, n)
return n, True
+ def set(self, slot, blob):
+ try:
+ slot.set(blob)
+ return
+ except OSError as e:
+ if e.errno == errno.ENOMEM:
+ # Reduce max slots to current - 4, cap cache and retry
+ with self._cache_lock:
+ self._max_slots = max(4, len(self._cache) - 4)
+ elif e.errno == errno.ENOSPC:
+ # Reduce disk max space to current - 256 MiB, cap cache and retry
+ with self._cache_lock:
+ sm = sum([st.size() for st in self._cache])
+ self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+ elif e.errno == errno.ENODEV:
+ _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+ except Exception as e:
+ pass
+ finally:
+ # Check if we should evict things from the cache. Either
+ # because we added a new thing or there was an error and
+ # we possibly adjusted the limits down, so we might need
+ # to push something out.
+ self.cap_cache()
+
+ try:
+ # Only gets here if there was an error the first time. The
+ # exception handler adjusts limits downward in some cases
+ # to free up resources, which would make the operation
+ # succeed.
+ slot.set(blob)
+ except Exception as e:
+ # It failed again. Give up.
+ slot.set(None)
+ raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+
+ self.cap_cache()
+
class Counter(object):
def __init__(self, v=0):
self._lk = threading.Lock()
class KeepClient(object):
+ DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
+ DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
- # Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 256 seconds
- # Default Keep server bandwidth minimum: 32768 bytes per second
- # Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 256 seconds
- # Default Keep proxy bandwidth minimum: 32768 bytes per second
- DEFAULT_TIMEOUT = (2, 256, 32768)
- DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
-
-
- class KeepService(object):
+ class KeepService(PyCurlHelper):
"""Make requests to a single Keep service, and track results.
A KeepService is intended to last long enough to perform one
download_counter=None,
headers={},
insecure=False):
+ super(KeepClient.KeepService, self).__init__()
self.root = root
self._user_agent_pool = user_agent_pool
self._result = {'error': None}
except:
ua.close()
- def _socket_open(self, *args, **kwargs):
- if len(args) + len(kwargs) == 2:
- return self._socket_open_pycurl_7_21_5(*args, **kwargs)
- else:
- return self._socket_open_pycurl_7_19_3(*args, **kwargs)
-
- def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
- return self._socket_open_pycurl_7_21_5(
- purpose=None,
- address=collections.namedtuple(
- 'Address', ['family', 'socktype', 'protocol', 'addr'],
- )(family, socktype, protocol, address))
-
- def _socket_open_pycurl_7_21_5(self, purpose, address):
- """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
- s = socket.socket(address.family, address.socktype, address.protocol)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- # Will throw invalid protocol error on mac. This test prevents that.
- if hasattr(socket, 'TCP_KEEPIDLE'):
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
- self._socket = s
- return s
-
def get(self, locator, method="GET", timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
+ else:
+ curl.setopt(pycurl.HTTPGET, True)
self._setcurltimeouts(curl, timeout, method=="HEAD")
try:
self.upload_counter.add(len(body))
return True
- def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
- if not timeouts:
- return
- elif isinstance(timeouts, tuple):
- if len(timeouts) == 2:
- conn_t, xfer_t = timeouts
- bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
- else:
- conn_t, xfer_t, bandwidth_bps = timeouts
- else:
- conn_t, xfer_t = (timeouts, timeouts)
- bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
- curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- if not ignore_bandwidth:
- curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
- curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
-
- def _headerfunction(self, header_line):
- if isinstance(header_line, bytes):
- header_line = header_line.decode('iso-8859-1')
- if ':' in header_line:
- name, value = header_line.split(':', 1)
- name = name.strip().lower()
- value = value.strip()
- elif self._headers:
- name = self._lastheadername
- value = self._headers[name] + ' ' + header_line.strip()
- elif header_line.startswith('HTTP/'):
- name = 'x-status-line'
- value = header_line
- else:
- _logger.error("Unexpected header line: %s", header_line)
- return
- self._lastheadername = name
- self._headers[name] = value
- # Returning None implies all bytes were written
-
class KeepWriterQueue(queue.Queue):
def __init__(self, copies, classes=[]):
else:
return None
- def get_from_cache(self, loc):
+ def get_from_cache(self, loc_s):
"""Fetch a block only if is in the cache, otherwise return None."""
- slot = self.block_cache.get(loc)
+ locator = KeepLocator(loc_s)
+ slot = self.block_cache.get(locator.md5sum)
if slot is not None and slot.ready.is_set():
return slot.get()
else:
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
if method == "GET":
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
+ if prefetch:
+ # this is request for a prefetch, if it is
+ # already in flight, return immediately.
+ # clear 'slot' to prevent finally block from
+ # calling slot.set()
+ slot = None
+ return None
self.hits_counter.add(1)
blob = slot.get()
if blob is None:
return blob
finally:
if slot is not None:
- slot.set(blob)
- self.block_cache.cap_cache()
+ self.block_cache.set(slot, blob)
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
return True
if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
return True
-
- def is_cached(self, locator):
- return self.block_cache.reserve_cache(expect_hash)