import timer
import datetime
import ssl
+import socket
_logger = logging.getLogger('arvados.keep')
global_client_object = None
def put(data, **kwargs):
return Keep.global_client_object().put(data, **kwargs)
+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content == None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ self._cache_lock.acquire()
+ try:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+ finally:
+ self._cache_lock.release()
+
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ self._cache_lock.acquire()
+ try:
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n, False
+
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+ finally:
+ self._cache_lock.release()
class KeepClient(object):
class ThreadLimiter(object):
class KeepService(object):
# Make requests to a single Keep service, and track results.
HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
- ssl.SSLError)
+ socket.error, ssl.SSLError)
def __init__(self, root, **headers):
self.root = root
resp_md5 = hashlib.md5(content).hexdigest()
if resp_md5 == locator.md5sum:
return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
return None
def put(self, http, hash_s, body):
self.service.last_result[1])
- def __init__(self, api_client=None, proxy=None, timeout=60,
- api_token=None, local_store=None):
+ def __init__(self, api_client=None, proxy=None, timeout=300,
+ api_token=None, local_store=None, block_cache=None,
+ num_retries=0):
"""Initialize a new KeepClient.
Arguments:
of the ARVADOS_KEEP_PROXY configuration setting. If you want to
ensure KeepClient does not use a proxy, pass in an empty string.
* timeout: The timeout for all HTTP requests, in seconds. Default
- 60.
+ 300.
* api_token: If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
+ * num_retries: The default number of times to retry failed requests.
+ This will be used as the default num_retries value when get() and
+ put() are called. Default 0.
"""
self.lock = threading.Lock()
if proxy is None:
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ self.block_cache = block_cache if block_cache else KeepBlockCache()
+
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
self.timeout = timeout
- self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
- self._cache = []
- self._cache_lock = threading.Lock()
+ self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
proxy += '/'
if not keep_services:
raise arvados.errors.NoKeepServersError()
- self.using_proxy = (keep_services[0].get('service_type') ==
- 'proxy')
+ self.using_proxy = any(ks.get('service_type') == 'proxy'
+ for ks in keep_services)
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_services)
+ roots = ("{}://[{}]:{:d}/".format(
+ 'https' if ks['service_ssl_flag'] else 'http',
+ ks['service_host'],
+ ks['service_port'])
+ for ks in keep_services)
self.service_roots = sorted(set(roots))
_logger.debug(str(self.service_roots))
_logger.debug(str(pseq))
return pseq
- class CacheSlot(object):
- def __init__(self, locator):
- self.locator = locator
- self.ready = threading.Event()
- self.content = None
-
- def get(self):
- self.ready.wait()
- return self.content
-
- def set(self, value):
- self.content = value
- self.ready.set()
-
- def size(self):
- if self.content == None:
- return 0
- else:
- return len(self.content)
-
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
- self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
- sm = sum([slot.size() for slot in self._cache])
- while sm > self.cache_max:
- del self._cache[-1]
- sm = sum([slot.size() for a in self._cache])
- finally:
- self._cache_lock.release()
-
- def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
- or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
-
- # Add a new cache slot for the locator
- n = KeepClient.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
else:
return None
- def get(self, loc_s, num_retries=0):
+ @retry.retry_method
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
*each* Keep server if it returns temporary failures, with
exponential backoff. Note that, in each loop, the method may try
to fetch data from every available Keep service, along with any
- that are named in location hints in the locator. Default 0.
+ that are named in location hints in the locator. The default value
+ is set when the KeepClient is initialized.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
- slot, first = self.reserve_cache(expect_hash)
+ slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
return v
# Always cache the result, then return it if we succeeded.
slot.set(blob)
- self.cap_cache()
+ self.block_cache.cap_cache()
if loop.success():
return blob
else:
raise arvados.errors.KeepReadError(loc_s)
- def put(self, data, copies=2, num_retries=0):
+ @retry.retry_method
+ def put(self, data, copies=2, num_retries=None):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
Default 2.
* num_retries: The number of times to retry PUT requests to
*each* Keep server if it returns temporary failures, with
- exponential backoff. Default 0.
+ exponential backoff. The default value is set when the
+ KeepClient is initialized.
"""
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, copies, thread_limiter.done()))
- def local_store_put(self, data):
+ # Local storage methods need no-op num_retries arguments to keep
+ # integration tests happy. With better isolation they could
+ # probably be removed again.
+ def local_store_put(self, data, num_retries=0):
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s):
+ def local_store_get(self, loc_s, num_retries=0):
try:
locator = KeepLocator(loc_s)
except ValueError: