import timer
import datetime
import ssl
+import socket
_logger = logging.getLogger('arvados.keep')
global_client_object = None
import arvados
import arvados.config as config
import arvados.errors
+import arvados.retry as retry
import arvados.util
class KeepLocator(object):
def put(data, **kwargs):
return Keep.global_client_object().put(data, **kwargs)
+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content == None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ self._cache_lock.acquire()
+ try:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+ finally:
+ self._cache_lock.release()
+
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ self._cache_lock.acquire()
+ try:
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n, False
+
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+ finally:
+ self._cache_lock.release()
class KeepClient(object):
class ThreadLimiter(object):
return self._done
+ class KeepService(object):
+ # Make requests to a single Keep service, and track results.
+ HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+ socket.error, ssl.SSLError)
+
+ def __init__(self, root, **headers):
+ self.root = root
+ self.last_result = None
+ self.success_flag = None
+ self.get_headers = {'Accept': 'application/octet-stream'}
+ self.get_headers.update(headers)
+ self.put_headers = headers
+
+ def usable(self):
+ return self.success_flag is not False
+
+ def finished(self):
+ return self.success_flag is not None
+
+ def last_status(self):
+ try:
+ return int(self.last_result[0].status)
+ except (AttributeError, IndexError, ValueError):
+ return None
+
+ def get(self, http, locator):
+ # http is an httplib2.Http object.
+ # locator is a KeepLocator object.
+ url = self.root + str(locator)
+ _logger.debug("Request: GET %s", url)
+ try:
+ with timer.Timer() as t:
+ result = http.request(url.encode('utf-8'), 'GET',
+ headers=self.get_headers)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ content = result[1]
+ _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+ self.last_status(), len(content), t.msecs,
+ (len(content)/(1024.0*1024))/t.secs)
+ if self.success_flag:
+ resp_md5 = hashlib.md5(content).hexdigest()
+ if resp_md5 == locator.md5sum:
+ return content
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
+ return None
+
+ def put(self, http, hash_s, body):
+ url = self.root + hash_s
+ _logger.debug("Request: PUT %s", url)
+ try:
+ result = http.request(url.encode('utf-8'), 'PUT',
+ headers=self.put_headers, body=body)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: PUT %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ return self.success_flag
+
+
class KeepWriterThread(threading.Thread):
"""
Write a blob of data to the given Keep server. On success, call
save_response() of the given ThreadLimiter to save the returned
locator.
"""
- def __init__(self, api_token, **kwargs):
+ def __init__(self, keep_service, **kwargs):
super(KeepClient.KeepWriterThread, self).__init__()
- self._api_token = api_token
+ self.service = keep_service
self.args = kwargs
self._success = False
self.run_with_limiter(limiter)
def run_with_limiter(self, limiter):
+ if self.service.finished():
+ return
_logger.debug("KeepWriterThread %s proceeding %s %s",
str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root'])
h = httplib2.Http(timeout=self.args.get('timeout', None))
- url = self.args['service_root'] + self.args['data_hash']
- headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-
- if self.args['using_proxy']:
- # We're using a proxy, so tell the proxy how many copies we
- # want it to store
- headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
- try:
- _logger.debug("Uploading to {}".format(url))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=self.args['data'])
- if (resp['status'] == '401' and
- re.match(r'Timestamp verification failed', content)):
- body = KeepClient.sign_for_old_server(
- self.args['data_hash'],
- self.args['data'])
- h = httplib2.Http(timeout=self.args.get('timeout', None))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=body)
- if re.match(r'^2\d\d$', resp['status']):
- self._success = True
- _logger.debug("KeepWriterThread %s succeeded %s %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root'])
+ self._success = bool(self.service.put(
+ h, self.args['data_hash'], self.args['data']))
+ status = self.service.last_status()
+ if self._success:
+ resp, body = self.service.last_result
+ _logger.debug("KeepWriterThread %s succeeded %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(resp['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
replicas_stored = 1
- if 'x-keep-replicas-stored' in resp:
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
- try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
- except ValueError:
- pass
- limiter.save_response(content.strip(), replicas_stored)
- else:
- _logger.debug("Request fail: PUT %s => %s %s",
- url, resp['status'], content)
- except (httplib2.HttpLib2Error,
- httplib.HTTPException,
- ssl.SSLError) as e:
- # When using https, timeouts look like ssl.SSLError from here.
- # "SSLError: The write operation timed out"
- _logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
+ limiter.save_response(body.strip(), replicas_stored)
+ elif status is not None:
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.args['data_hash'], status,
+ self.service.last_result[1])
- def __init__(self, api_client=None, proxy=None, timeout=60,
- api_token=None, local_store=None):
+ def __init__(self, api_client=None, proxy=None, timeout=300,
+ api_token=None, local_store=None, block_cache=None,
+ num_retries=0):
"""Initialize a new KeepClient.
Arguments:
of the ARVADOS_KEEP_PROXY configuration setting. If you want to
ensure KeepClient does not use a proxy, pass in an empty string.
* timeout: The timeout for all HTTP requests, in seconds. Default
- 60.
+ 300.
* api_token: If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
+ * num_retries: The default number of times to retry failed requests.
+ This will be used as the default num_retries value when get() and
+ put() are called. Default 0.
"""
self.lock = threading.Lock()
if proxy is None:
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ self.block_cache = block_cache if block_cache else KeepBlockCache()
+
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
self.timeout = timeout
- self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
- self._cache = []
- self._cache_lock = threading.Lock()
+ self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
self.service_roots = [proxy]
self.using_proxy = True
+ self.static_service_roots = True
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
self.api_token = api_client.api_token
self.service_roots = None
self.using_proxy = None
+ self.static_service_roots = False
- def shuffled_service_roots(self, hash):
- if self.service_roots is None:
- with self.lock:
- try:
- keep_services = self.api_client.keep_services().accessible()
- except Exception: # API server predates Keep services.
- keep_services = self.api_client.keep_disks().list()
+ def build_service_roots(self, force_rebuild=False):
+ if (self.static_service_roots or
+ (self.service_roots and not force_rebuild)):
+ return
+ with self.lock:
+ try:
+ keep_services = self.api_client.keep_services().accessible()
+ except Exception: # API server predates Keep services.
+ keep_services = self.api_client.keep_disks().list()
+
+ keep_services = keep_services.execute().get('items')
+ if not keep_services:
+ raise arvados.errors.NoKeepServersError()
- keep_services = keep_services.execute().get('items')
- if not keep_services:
- raise arvados.errors.NoKeepServersError()
+ self.using_proxy = any(ks.get('service_type') == 'proxy'
+ for ks in keep_services)
- self.using_proxy = (keep_services[0].get('service_type') ==
- 'proxy')
+ roots = ("{}://[{}]:{:d}/".format(
+ 'https' if ks['service_ssl_flag'] else 'http',
+ ks['service_host'],
+ ks['service_port'])
+ for ks in keep_services)
+ self.service_roots = sorted(set(roots))
+ _logger.debug(str(self.service_roots))
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
+ def shuffled_service_roots(self, hash, force_rebuild=False):
+ self.build_service_roots(force_rebuild)
# Build an ordering with which to query the Keep servers based on the
# contents of the hash.
_logger.debug(str(pseq))
return pseq
- class CacheSlot(object):
- def __init__(self, locator):
- self.locator = locator
- self.ready = threading.Event()
- self.content = None
-
- def get(self):
- self.ready.wait()
- return self.content
- def set(self, value):
- self.content = value
- self.ready.set()
-
- def size(self):
- if self.content == None:
- return 0
- else:
- return len(self.content)
+ def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ # roots_map is a dictionary, mapping Keep service root strings
+ # to KeepService objects. Poll for Keep services, and add any
+ # new ones to roots_map. Return the current list of local
+ # root strings.
+ headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+ local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ for root in local_roots:
+ if root not in roots_map:
+ roots_map[root] = self.KeepService(root, **headers)
+ return local_roots
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
- self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
- sm = sum([slot.size() for slot in self._cache])
- while sm > self.cache_max:
- del self._cache[-1]
- sm = sum([slot.size() for a in self._cache])
- finally:
- self._cache_lock.release()
+ @staticmethod
+ def _check_loop_result(result):
+ # KeepClient RetryLoops should save results as a 2-tuple: the
+ # actual result of the request, and the number of servers available
+ # to receive the request this round.
+ # This method returns True if there's a real result, False if
+ # there are no more servers available, otherwise None.
+ if isinstance(result, Exception):
+ return None
+ result, tried_server_count = result
+ if (result is not None) and (result is not False):
+ return True
+ elif tried_server_count < 1:
+ _logger.info("No more Keep services to try; giving up")
+ return False
+ else:
+ return None
- def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
- or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
+ @retry.retry_method
+ def get(self, loc_s, num_retries=None):
+ """Get data from Keep.
- # Add a new cache slot for the locator
- n = KeepClient.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
+ This method fetches one or more blocks of data from Keep. It
+ sends a request each Keep service registered with the API
+ server (or the proxy provided when this client was
+ instantiated), then each service named in location hints, in
+ sequence. As soon as one service provides the data, it's
+ returned.
- def get(self, loc_s):
+ Arguments:
+ * loc_s: A string of one or more comma-separated locators to fetch.
+ This method returns the concatenation of these blocks.
+ * num_retries: The number of times to retry GET requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Note that, in each loop, the method may try
+ to fetch data from every available Keep service, along with any
+ that are named in location hints in the locator. The default value
+ is set when the KeepClient is initialized.
+ """
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
- slot, first = self.reserve_cache(expect_hash)
-
+ slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
return v
- try:
- for service_root in self.shuffled_service_roots(expect_hash):
- url = service_root + loc_s
- headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
- 'Accept': 'application/octet-stream'}
- blob = self.get_url(url, headers, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
-
- for hint in locator.hints:
- if not hint.startswith('K@'):
- continue
- url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
- blob = self.get_url(url, {}, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
- except:
- slot.set(None)
- self.cap_cache()
- raise
-
- slot.set(None)
- self.cap_cache()
- raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
-
- def get_url(self, url, headers, expect_hash):
- h = httplib2.Http()
- try:
- _logger.debug("Request: GET %s", url)
- with timer.Timer() as t:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
- len(content), t.msecs,
- (len(content)/(1024*1024))/t.secs)
- if re.match(r'^2\d\d$', resp['status']):
- md5 = hashlib.md5(content).hexdigest()
- if md5 == expect_hash:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
- except Exception as e:
- _logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- return None
-
- def put(self, data, copies=2):
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@')]
+ # Map root URLs their KeepService objects.
+ roots_map = {root: self.KeepService(root) for root in hint_roots}
+ blob = None
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ local_roots = self.map_new_services(
+ roots_map, expect_hash,
+ force_rebuild=(tries_left < num_retries))
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in (local_roots + hint_roots)
+ if roots_map[root].usable()]
+ http = httplib2.Http(timeout=self.timeout)
+ for keep_service in services_to_try:
+ blob = keep_service.get(http, locator)
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ slot.set(blob)
+ self.block_cache.cap_cache()
+ if loop.success():
+ return blob
+
+ # No servers fulfilled the request. Count how many responded
+ # "not found;" if the ratio is high enough (currently 75%), report
+ # Not Found; otherwise a generic error.
+ # Q: Including 403 is necessary for the Keep tests to continue
+ # passing, but maybe they should expect KeepReadError instead?
+ not_founds = sum(1 for ks in roots_map.values()
+ if ks.last_status() in set([403, 404, 410]))
+ if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
+ raise arvados.errors.NotFoundError(loc_s)
+ else:
+ raise arvados.errors.KeepReadError(loc_s)
+
+ @retry.retry_method
+ def put(self, data, copies=2, num_retries=None):
+ """Save data in Keep.
+
+ This method will get a list of Keep services from the API server, and
+ send the data to each one simultaneously in a new thread. Once the
+ uploads are finished, if enough copies are saved, this method returns
+ the most recent HTTP response body. If requests fail to upload
+ enough copies, this method raises KeepWriteError.
+
+ Arguments:
+ * data: The string of data to upload.
+ * copies: The number of copies that the user requires be saved.
+ Default 2.
+ * num_retries: The number of times to retry PUT requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. The default value is set when the
+ KeepClient is initialized.
+ """
data_hash = hashlib.md5(data).hexdigest()
- have_copies = 0
- want_copies = copies
- if not (want_copies > 0):
+ if copies < 1:
return data_hash
- threads = []
- thread_limiter = KeepClient.ThreadLimiter(want_copies)
- for service_root in self.shuffled_service_roots(data_hash):
- t = KeepClient.KeepWriterThread(
- self.api_token,
- data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- timeout=self.timeout,
- using_proxy=self.using_proxy,
- want_copies=(want_copies if self.using_proxy else 1))
- t.start()
- threads += [t]
- for t in threads:
- t.join()
- if thread_limiter.done() < want_copies:
- # Retry the threads (i.e., services) that failed the first
- # time around.
- threads_retry = []
+
+ headers = {}
+ if self.using_proxy:
+ # Tell the proxy how many copies we want it to store
+ headers['X-Keep-Desired-Replication'] = str(copies)
+ roots_map = {}
+ thread_limiter = KeepClient.ThreadLimiter(copies)
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ local_roots = self.map_new_services(
+ roots_map, data_hash,
+ force_rebuild=(tries_left < num_retries), **headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ threads = []
+ for service_root, ks in roots_map.iteritems():
+ if ks.finished():
+ continue
+ t = KeepClient.KeepWriterThread(
+ ks,
+ data=data,
+ data_hash=data_hash,
+ service_root=service_root,
+ thread_limiter=thread_limiter,
+ timeout=self.timeout)
+ t.start()
+ threads.append(t)
for t in threads:
- if not t.success():
- _logger.debug("Retrying: PUT %s %s",
- t.args['service_root'],
- t.args['data_hash'])
- retry_with_args = t.args.copy()
- t_retry = KeepClient.KeepWriterThread(self.api_token,
- **retry_with_args)
- t_retry.start()
- threads_retry += [t_retry]
- for t in threads_retry:
t.join()
- have_copies = thread_limiter.done()
- # If we're done, return the response from Keep
- if have_copies >= want_copies:
+ loop.save_result((thread_limiter.done() >= copies, len(threads)))
+
+ if loop.success():
return thread_limiter.response()
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
- (data_hash, want_copies, have_copies))
-
- @staticmethod
- def sign_for_old_server(data_hash, data):
- return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
+ (data_hash, copies, thread_limiter.done()))
- def local_store_put(self, data):
+ # Local storage methods need no-op num_retries arguments to keep
+ # integration tests happy. With better isolation they could
+ # probably be removed again.
+ def local_store_put(self, data, num_retries=0):
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s):
+ def local_store_get(self, loc_s, num_retries=0):
try:
locator = KeepLocator(loc_s)
except ValueError: