import arvados
import arvados.config as config
import arvados.errors
+import arvados.retry as retry
import arvados.util
class KeepLocator(object):
return self._done
+ class KeepService(object):
+ # Make requests to a single Keep service, and track results.
+ HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+ ssl.SSLError)
+
+ def __init__(self, root, **headers):
+ self.root = root
+ self.last_result = None
+ self.success_flag = None
+ self.get_headers = {'Accept': 'application/octet-stream'}
+ self.get_headers.update(headers)
+ self.put_headers = headers
+
+ def usable(self):
+ return self.success_flag is not False
+
+ def finished(self):
+ return self.success_flag is not None
+
+ def last_status(self):
+ try:
+ return int(self.last_result[0].status)
+ except (AttributeError, IndexError, ValueError):
+ return None
+
+ def get(self, http, locator):
+ # http is an httplib2.Http object.
+ # locator is a KeepLocator object.
+ url = self.root + str(locator)
+ _logger.debug("Request: GET %s", url)
+ try:
+ with timer.Timer() as t:
+ result = http.request(url.encode('utf-8'), 'GET',
+ headers=self.get_headers)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ content = result[1]
+ _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
+ self.last_status(), len(content), t.msecs,
+ (len(content)/(1024*1024))/t.secs)
+ if self.success_flag:
+ resp_md5 = hashlib.md5(content).hexdigest()
+ if resp_md5 == locator.md5sum:
+ return content
+ _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+ return None
+
+ def put(self, http, hash_s, body):
+ url = self.root + hash_s
+ _logger.debug("Request: PUT %s", url)
+ try:
+ result = http.request(url.encode('utf-8'), 'PUT',
+ headers=self.put_headers, body=body)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: PUT %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ return self.success_flag
+
+
class KeepWriterThread(threading.Thread):
"""
Write a blob of data to the given Keep server. On success, call
save_response() of the given ThreadLimiter to save the returned
locator.
"""
- def __init__(self, api_token, **kwargs):
+ def __init__(self, keep_service, **kwargs):
super(KeepClient.KeepWriterThread, self).__init__()
- self._api_token = api_token
+ self.service = keep_service
self.args = kwargs
self._success = False
self.run_with_limiter(limiter)
def run_with_limiter(self, limiter):
+ if self.service.finished():
+ return
_logger.debug("KeepWriterThread %s proceeding %s %s",
str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root'])
h = httplib2.Http(timeout=self.args.get('timeout', None))
- url = self.args['service_root'] + self.args['data_hash']
- headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-
- if self.args['using_proxy']:
- # We're using a proxy, so tell the proxy how many copies we
- # want it to store
- headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
- try:
- _logger.debug("Uploading to {}".format(url))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=self.args['data'])
- if re.match(r'^2\d\d$', resp['status']):
- self._success = True
- _logger.debug("KeepWriterThread %s succeeded %s %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root'])
+ self._success = bool(self.service.put(
+ h, self.args['data_hash'], self.args['data']))
+ status = self.service.last_status()
+ if self._success:
+ resp, body = self.service.last_result
+ _logger.debug("KeepWriterThread %s succeeded %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(resp['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
replicas_stored = 1
- if 'x-keep-replicas-stored' in resp:
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
- try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
- except ValueError:
- pass
- limiter.save_response(content.strip(), replicas_stored)
- else:
- _logger.debug("Request fail: PUT %s => %s %s",
- url, resp['status'], content)
- except (httplib2.HttpLib2Error,
- httplib.HTTPException,
- ssl.SSLError) as e:
- # When using https, timeouts look like ssl.SSLError from here.
- # "SSLError: The write operation timed out"
- _logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
+ limiter.save_response(body.strip(), replicas_stored)
+ elif status is not None:
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.args['data_hash'], status,
+ self.service.last_result[1])
def __init__(self, api_client=None, proxy=None, timeout=60,
self.api_token = api_token
self.service_roots = [proxy]
self.using_proxy = True
+ self.static_service_roots = True
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
self.api_token = api_client.api_token
self.service_roots = None
self.using_proxy = None
+ self.static_service_roots = False
- def shuffled_service_roots(self, hash):
- if self.service_roots is None:
- with self.lock:
- try:
- keep_services = self.api_client.keep_services().accessible()
- except Exception: # API server predates Keep services.
- keep_services = self.api_client.keep_disks().list()
+ def build_service_roots(self, force_rebuild=False):
+ if (self.static_service_roots or
+ (self.service_roots and not force_rebuild)):
+ return
+ with self.lock:
+ try:
+ keep_services = self.api_client.keep_services().accessible()
+ except Exception: # API server predates Keep services.
+ keep_services = self.api_client.keep_disks().list()
- keep_services = keep_services.execute().get('items')
- if not keep_services:
- raise arvados.errors.NoKeepServersError()
+ keep_services = keep_services.execute().get('items')
+ if not keep_services:
+ raise arvados.errors.NoKeepServersError()
- self.using_proxy = (keep_services[0].get('service_type') ==
- 'proxy')
+ self.using_proxy = (keep_services[0].get('service_type') ==
+ 'proxy')
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_services)
+ self.service_roots = sorted(set(roots))
+ _logger.debug(str(self.service_roots))
+
+ def shuffled_service_roots(self, hash, force_rebuild=False):
+ self.build_service_roots(force_rebuild)
# Build an ordering with which to query the Keep servers based on the
# contents of the hash.
finally:
self._cache_lock.release()
- def get(self, loc_s):
+ def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ # roots_map is a dictionary, mapping Keep service root strings
+ # to KeepService objects. Poll for Keep services, and add any
+ # new ones to roots_map. Return the current list of local
+ # root strings.
+ headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+ local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ for root in local_roots:
+ if root not in roots_map:
+ roots_map[root] = self.KeepService(root, **headers)
+ return local_roots
+
+ @staticmethod
+ def _check_loop_result(result):
+ # KeepClient RetryLoops should save results as a 2-tuple: the
+ # actual result of the request, and the number of servers available
+ # to receive the request this round.
+ # This method returns True if there's a real result, False if
+ # there are no more servers available, otherwise None.
+ if isinstance(result, Exception):
+ return None
+ result, tried_server_count = result
+ if (result is not None) and (result is not False):
+ return True
+ elif tried_server_count < 1:
+ _logger.info("No more Keep services to try; giving up")
+ return False
+ else:
+ return None
+
+ def get(self, loc_s, num_retries=0):
+ """Get data from Keep.
+
+ This method fetches one or more blocks of data from Keep. It
+ sends a request each Keep service registered with the API
+ server (or the proxy provided when this client was
+ instantiated), then each service named in location hints, in
+ sequence. As soon as one service provides the data, it's
+ returned.
+
+ Arguments:
+ * loc_s: A string of one or more comma-separated locators to fetch.
+ This method returns the concatenation of these blocks.
+ * num_retries: The number of times to retry GET requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Note that, in each loop, the method may try
+ to fetch data from every available Keep service, along with any
+ that are named in location hints in the locator. Default 0.
+ """
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
slot, first = self.reserve_cache(expect_hash)
-
if not first:
v = slot.get()
return v
- try:
- for service_root in self.shuffled_service_roots(expect_hash):
- url = service_root + loc_s
- headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
- 'Accept': 'application/octet-stream'}
- blob = self.get_url(url, headers, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
-
- for hint in locator.hints:
- if not hint.startswith('K@'):
- continue
- url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
- blob = self.get_url(url, {}, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
- except:
- slot.set(None)
- self.cap_cache()
- raise
-
- slot.set(None)
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@')]
+ # Map root URLs their KeepService objects.
+ roots_map = {root: self.KeepService(root) for root in hint_roots}
+ blob = None
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ local_roots = self.map_new_services(
+ roots_map, expect_hash,
+ force_rebuild=(tries_left < num_retries))
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in (local_roots + hint_roots)
+ if roots_map[root].usable()]
+ http = httplib2.Http(timeout=self.timeout)
+ for keep_service in services_to_try:
+ blob = keep_service.get(http, locator)
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ slot.set(blob)
self.cap_cache()
- raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+ if loop.success():
+ return blob
+
+ # No servers fulfilled the request. Count how many responded
+ # "not found;" if the ratio is high enough (currently 75%), report
+ # Not Found; otherwise a generic error.
+ # Q: Including 403 is necessary for the Keep tests to continue
+ # passing, but maybe they should expect KeepReadError instead?
+ not_founds = sum(1 for ks in roots_map.values()
+ if ks.last_status() in set([403, 404, 410]))
+ if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
+ raise arvados.errors.NotFoundError(loc_s)
+ else:
+ raise arvados.errors.KeepReadError(loc_s)
- def get_url(self, url, headers, expect_hash):
- h = httplib2.Http()
- try:
- _logger.debug("Request: GET %s", url)
- with timer.Timer() as t:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
- len(content), t.msecs,
- (len(content)/(1024*1024))/t.secs)
- if re.match(r'^2\d\d$', resp['status']):
- md5 = hashlib.md5(content).hexdigest()
- if md5 == expect_hash:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
- except Exception as e:
- _logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- return None
-
- def put(self, data, copies=2):
+ def put(self, data, copies=2, num_retries=0):
+ """Save data in Keep.
+
+ This method will get a list of Keep services from the API server, and
+ send the data to each one simultaneously in a new thread. Once the
+ uploads are finished, if enough copies are saved, this method returns
+ the most recent HTTP response body. If requests fail to upload
+ enough copies, this method raises KeepWriteError.
+
+ Arguments:
+ * data: The string of data to upload.
+ * copies: The number of copies that the user requires be saved.
+ Default 2.
+ * num_retries: The number of times to retry PUT requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Default 0.
+ """
data_hash = hashlib.md5(data).hexdigest()
- have_copies = 0
- want_copies = copies
- if not (want_copies > 0):
+ if copies < 1:
return data_hash
- threads = []
- thread_limiter = KeepClient.ThreadLimiter(want_copies)
- for service_root in self.shuffled_service_roots(data_hash):
- t = KeepClient.KeepWriterThread(
- self.api_token,
- data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- timeout=self.timeout,
- using_proxy=self.using_proxy,
- want_copies=(want_copies if self.using_proxy else 1))
- t.start()
- threads += [t]
- for t in threads:
- t.join()
- if thread_limiter.done() < want_copies:
- # Retry the threads (i.e., services) that failed the first
- # time around.
- threads_retry = []
+
+ headers = {}
+ if self.using_proxy:
+ # Tell the proxy how many copies we want it to store
+ headers['X-Keep-Desired-Replication'] = str(copies)
+ roots_map = {}
+ thread_limiter = KeepClient.ThreadLimiter(copies)
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ local_roots = self.map_new_services(
+ roots_map, data_hash,
+ force_rebuild=(tries_left < num_retries), **headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ threads = []
+ for service_root, ks in roots_map.iteritems():
+ if ks.finished():
+ continue
+ t = KeepClient.KeepWriterThread(
+ ks,
+ data=data,
+ data_hash=data_hash,
+ service_root=service_root,
+ thread_limiter=thread_limiter,
+ timeout=self.timeout)
+ t.start()
+ threads.append(t)
for t in threads:
- if not t.success():
- _logger.debug("Retrying: PUT %s %s",
- t.args['service_root'],
- t.args['data_hash'])
- retry_with_args = t.args.copy()
- t_retry = KeepClient.KeepWriterThread(self.api_token,
- **retry_with_args)
- t_retry.start()
- threads_retry += [t_retry]
- for t in threads_retry:
t.join()
- have_copies = thread_limiter.done()
- # If we're done, return the response from Keep
- if have_copies >= want_copies:
+ loop.save_result((thread_limiter.done() >= copies, len(threads)))
+
+ if loop.success():
return thread_limiter.response()
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
- (data_hash, want_copies, have_copies))
-
+ (data_hash, copies, thread_limiter.done()))
def local_store_put(self, data):
md5 = hashlib.md5(data).hexdigest()
-# usage example:
-#
-# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
-
+import mock
import os
import unittest
import arvados
+import arvados.retry
import run_test_server
+from arvados_testutil import fake_httplib2_response
class KeepTestCase(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
'baz2',
'wrong content from Keep.get(md5("baz2"))')
self.assertTrue(keep_client.using_proxy)
+
+
+class KeepClientRetryTestMixin(object):
+ # Testing with a local Keep store won't exercise the retry behavior.
+ # Instead, our strategy is:
+ # * Create a client with one proxy specified (pointed at a black
+ # hole), so there's no need to instantiate an API client, and
+ # all HTTP requests come from one place.
+ # * Mock httplib's request method to provide simulated responses.
+ # This lets us test the retry logic extensively without relying on any
+ # supporting servers, and prevents side effects in case something hiccups.
+ # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
+ # run_method().
+ PROXY_ADDR = 'http://[100::]/'
+ TEST_DATA = 'testdata'
+ TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
+
+ @staticmethod
+ def mock_responses(body, *codes):
+ return mock.patch('httplib2.Http.request', side_effect=(
+ (fake_httplib2_response(code), body) for code in codes))
+
+ def new_client(self):
+ return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+
+ def run_method(self, *args, **kwargs):
+ raise NotImplementedError("test subclasses must define run_method")
+
+ def check_success(self, expected=None, *args, **kwargs):
+ if expected is None:
+ expected = self.DEFAULT_EXPECT
+ self.assertEqual(expected, self.run_method(*args, **kwargs))
+
+ def check_exception(self, error_class=None, *args, **kwargs):
+ if error_class is None:
+ error_class = self.DEFAULT_EXCEPTION
+ self.assertRaises(error_class, self.run_method, *args, **kwargs)
+
+ def test_immediate_success(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ self.check_success()
+
+ def test_retry_then_success(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ self.check_success(num_retries=3)
+
+ def test_no_default_retry(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ self.check_exception()
+
+ def test_no_retry_after_permanent_error(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+ self.check_exception(num_retries=3)
+
+ def test_error_after_retries_exhausted(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+ self.check_exception(num_retries=1)
+
+
+# Don't delay from HTTPRetryLoop's exponential backoff.
+no_backoff = mock.patch('time.sleep', lambda n: None)
+@no_backoff
+class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
+ DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+ HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
+
+ def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
+ *args, **kwargs):
+ return self.new_client().get(locator, *args, **kwargs)
+
+ def test_specific_exception_when_not_found(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+ self.check_exception(arvados.errors.NotFoundError, num_retries=3)
+
+ def test_general_exception_with_mixed_errors(self):
+ # get should raise a NotFoundError if no server returns the block,
+ # and a high threshold of servers report that it's not found.
+ # This test rigs up 50/50 disagreement between two servers, and
+ # checks that it does not become a NotFoundError.
+ client = self.new_client()
+ with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+ with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
+ client.get(self.HINTED_LOCATOR)
+ self.assertNotIsInstance(
+ exc_check.exception, arvados.errors.NotFoundError,
+ "mixed errors raised NotFoundError")
+
+ def test_hint_server_can_succeed_without_retries(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+ self.check_success(locator=self.HINTED_LOCATOR)
+
+
+@no_backoff
+class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
+ DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
+
+ def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
+ copies=1, *args, **kwargs):
+ return self.new_client().put(data, copies, *args, **kwargs)
+
+ def test_do_not_send_multiple_copies_to_same_server(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ self.check_exception(copies=2, num_retries=3)