From 5a043a14a8d321adeb1fc14c9cd6f479f8ea8216 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Fri, 22 Aug 2014 13:38:40 -0400 Subject: [PATCH] 3147: Add retry support to Python SDK's KeepClient. --- sdk/python/arvados/errors.py | 6 +- sdk/python/arvados/keep.py | 419 +++++++++++++++++---------- sdk/python/tests/test_keep_client.py | 112 ++++++- 3 files changed, 383 insertions(+), 154 deletions(-) diff --git a/sdk/python/arvados/errors.py b/sdk/python/arvados/errors.py index 1d9c77851e..89910aa60f 100644 --- a/sdk/python/arvados/errors.py +++ b/sdk/python/arvados/errors.py @@ -17,12 +17,14 @@ class SyntaxError(Exception): pass class AssertionError(Exception): pass -class NotFoundError(Exception): - pass class CommandFailedError(Exception): pass +class KeepReadError(Exception): + pass class KeepWriteError(Exception): pass +class NotFoundError(KeepReadError): + pass class NotImplementedError(Exception): pass class NoKeepServersError(Exception): diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 909ee1fec4..e75d64e57f 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -27,6 +27,7 @@ global_client_object = None import arvados import arvados.config as config import arvados.errors +import arvados.retry as retry import arvados.util class KeepLocator(object): @@ -200,15 +201,83 @@ class KeepClient(object): return self._done + class KeepService(object): + # Make requests to a single Keep service, and track results. + HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException, + ssl.SSLError) + + def __init__(self, root, **headers): + self.root = root + self.last_result = None + self.success_flag = None + self.get_headers = {'Accept': 'application/octet-stream'} + self.get_headers.update(headers) + self.put_headers = headers + + def usable(self): + return self.success_flag is not False + + def finished(self): + return self.success_flag is not None + + def last_status(self): + try: + return int(self.last_result[0].status) + except (AttributeError, IndexError, ValueError): + return None + + def get(self, http, locator): + # http is an httplib2.Http object. + # locator is a KeepLocator object. + url = self.root + str(locator) + _logger.debug("Request: GET %s", url) + try: + with timer.Timer() as t: + result = http.request(url.encode('utf-8'), 'GET', + headers=self.get_headers) + except self.HTTP_ERRORS as e: + _logger.debug("Request fail: GET %s => %s: %s", + url, type(e), str(e)) + self.last_result = e + else: + self.last_result = result + self.success_flag = retry.check_http_response_success(result) + content = result[1] + _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)", + self.last_status(), len(content), t.msecs, + (len(content)/(1024*1024))/t.secs) + if self.success_flag: + resp_md5 = hashlib.md5(content).hexdigest() + if resp_md5 == locator.md5sum: + return content + _logger.warning("Checksum fail: md5(%s) = %s", url, md5) + return None + + def put(self, http, hash_s, body): + url = self.root + hash_s + _logger.debug("Request: PUT %s", url) + try: + result = http.request(url.encode('utf-8'), 'PUT', + headers=self.put_headers, body=body) + except self.HTTP_ERRORS as e: + _logger.debug("Request fail: PUT %s => %s: %s", + url, type(e), str(e)) + self.last_result = e + else: + self.last_result = result + self.success_flag = retry.check_http_response_success(result) + return self.success_flag + + class KeepWriterThread(threading.Thread): """ Write a blob of data to the given Keep server. On success, call save_response() of the given ThreadLimiter to save the returned locator. """ - def __init__(self, api_token, **kwargs): + def __init__(self, keep_service, **kwargs): super(KeepClient.KeepWriterThread, self).__init__() - self._api_token = api_token + self.service = keep_service self.args = kwargs self._success = False @@ -224,51 +293,35 @@ class KeepClient(object): self.run_with_limiter(limiter) def run_with_limiter(self, limiter): + if self.service.finished(): + return _logger.debug("KeepWriterThread %s proceeding %s %s", str(threading.current_thread()), self.args['data_hash'], self.args['service_root']) h = httplib2.Http(timeout=self.args.get('timeout', None)) - url = self.args['service_root'] + self.args['data_hash'] - headers = {'Authorization': "OAuth2 %s" % (self._api_token,)} - - if self.args['using_proxy']: - # We're using a proxy, so tell the proxy how many copies we - # want it to store - headers['X-Keep-Desired-Replication'] = str(self.args['want_copies']) - - try: - _logger.debug("Uploading to {}".format(url)) - resp, content = h.request(url.encode('utf-8'), 'PUT', - headers=headers, - body=self.args['data']) - if re.match(r'^2\d\d$', resp['status']): - self._success = True - _logger.debug("KeepWriterThread %s succeeded %s %s", - str(threading.current_thread()), - self.args['data_hash'], - self.args['service_root']) + self._success = bool(self.service.put( + h, self.args['data_hash'], self.args['data'])) + status = self.service.last_status() + if self._success: + resp, body = self.service.last_result + _logger.debug("KeepWriterThread %s succeeded %s %s", + str(threading.current_thread()), + self.args['data_hash'], + self.args['service_root']) + # Tick the 'done' counter for the number of replica + # reported stored by the server, for the case that + # we're talking to a proxy or other backend that + # stores to multiple copies for us. + try: + replicas_stored = int(resp['x-keep-replicas-stored']) + except (KeyError, ValueError): replicas_stored = 1 - if 'x-keep-replicas-stored' in resp: - # Tick the 'done' counter for the number of replica - # reported stored by the server, for the case that - # we're talking to a proxy or other backend that - # stores to multiple copies for us. - try: - replicas_stored = int(resp['x-keep-replicas-stored']) - except ValueError: - pass - limiter.save_response(content.strip(), replicas_stored) - else: - _logger.debug("Request fail: PUT %s => %s %s", - url, resp['status'], content) - except (httplib2.HttpLib2Error, - httplib.HTTPException, - ssl.SSLError) as e: - # When using https, timeouts look like ssl.SSLError from here. - # "SSLError: The write operation timed out" - _logger.debug("Request fail: PUT %s => %s: %s", - url, type(e), str(e)) + limiter.save_response(body.strip(), replicas_stored) + elif status is not None: + _logger.debug("Request fail: PUT %s => %s %s", + self.args['data_hash'], status, + self.service.last_result[1]) def __init__(self, api_client=None, proxy=None, timeout=60, @@ -323,6 +376,7 @@ class KeepClient(object): self.api_token = api_token self.service_roots = [proxy] self.using_proxy = True + self.static_service_roots = True else: # It's important to avoid instantiating an API client # unless we actually need one, for testing's sake. @@ -332,29 +386,35 @@ class KeepClient(object): self.api_token = api_client.api_token self.service_roots = None self.using_proxy = None + self.static_service_roots = False - def shuffled_service_roots(self, hash): - if self.service_roots is None: - with self.lock: - try: - keep_services = self.api_client.keep_services().accessible() - except Exception: # API server predates Keep services. - keep_services = self.api_client.keep_disks().list() + def build_service_roots(self, force_rebuild=False): + if (self.static_service_roots or + (self.service_roots and not force_rebuild)): + return + with self.lock: + try: + keep_services = self.api_client.keep_services().accessible() + except Exception: # API server predates Keep services. + keep_services = self.api_client.keep_disks().list() - keep_services = keep_services.execute().get('items') - if not keep_services: - raise arvados.errors.NoKeepServersError() + keep_services = keep_services.execute().get('items') + if not keep_services: + raise arvados.errors.NoKeepServersError() - self.using_proxy = (keep_services[0].get('service_type') == - 'proxy') + self.using_proxy = (keep_services[0].get('service_type') == + 'proxy') - roots = (("http%s://%s:%d/" % - ('s' if f['service_ssl_flag'] else '', - f['service_host'], - f['service_port'])) - for f in keep_services) - self.service_roots = sorted(set(roots)) - _logger.debug(str(self.service_roots)) + roots = (("http%s://%s:%d/" % + ('s' if f['service_ssl_flag'] else '', + f['service_host'], + f['service_port'])) + for f in keep_services) + self.service_roots = sorted(set(roots)) + _logger.debug(str(self.service_roots)) + + def shuffled_service_roots(self, hash, force_rebuild=False): + self.build_service_roots(force_rebuild) # Build an ordering with which to query the Keep servers based on the # contents of the hash. @@ -454,113 +514,176 @@ class KeepClient(object): finally: self._cache_lock.release() - def get(self, loc_s): + def map_new_services(self, roots_map, md5_s, force_rebuild, **headers): + # roots_map is a dictionary, mapping Keep service root strings + # to KeepService objects. Poll for Keep services, and add any + # new ones to roots_map. Return the current list of local + # root strings. + headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) + local_roots = self.shuffled_service_roots(md5_s, force_rebuild) + for root in local_roots: + if root not in roots_map: + roots_map[root] = self.KeepService(root, **headers) + return local_roots + + @staticmethod + def _check_loop_result(result): + # KeepClient RetryLoops should save results as a 2-tuple: the + # actual result of the request, and the number of servers available + # to receive the request this round. + # This method returns True if there's a real result, False if + # there are no more servers available, otherwise None. + if isinstance(result, Exception): + return None + result, tried_server_count = result + if (result is not None) and (result is not False): + return True + elif tried_server_count < 1: + _logger.info("No more Keep services to try; giving up") + return False + else: + return None + + def get(self, loc_s, num_retries=0): + """Get data from Keep. + + This method fetches one or more blocks of data from Keep. It + sends a request each Keep service registered with the API + server (or the proxy provided when this client was + instantiated), then each service named in location hints, in + sequence. As soon as one service provides the data, it's + returned. + + Arguments: + * loc_s: A string of one or more comma-separated locators to fetch. + This method returns the concatenation of these blocks. + * num_retries: The number of times to retry GET requests to + *each* Keep server if it returns temporary failures, with + exponential backoff. Note that, in each loop, the method may try + to fetch data from every available Keep service, along with any + that are named in location hints in the locator. Default 0. + """ if ',' in loc_s: return ''.join(self.get(x) for x in loc_s.split(',')) locator = KeepLocator(loc_s) expect_hash = locator.md5sum slot, first = self.reserve_cache(expect_hash) - if not first: v = slot.get() return v - try: - for service_root in self.shuffled_service_roots(expect_hash): - url = service_root + loc_s - headers = {'Authorization': "OAuth2 %s" % (self.api_token,), - 'Accept': 'application/octet-stream'} - blob = self.get_url(url, headers, expect_hash) - if blob: - slot.set(blob) - self.cap_cache() - return blob - - for hint in locator.hints: - if not hint.startswith('K@'): - continue - url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s - blob = self.get_url(url, {}, expect_hash) - if blob: - slot.set(blob) - self.cap_cache() - return blob - except: - slot.set(None) - self.cap_cache() - raise - - slot.set(None) + # See #3147 for a discussion of the loop implementation. Highlights: + # * Refresh the list of Keep services after each failure, in case + # it's being updated. + # * Retry until we succeed, we're out of retries, or every available + # service has returned permanent failure. + hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) + for hint in locator.hints if hint.startswith('K@')] + # Map root URLs their KeepService objects. + roots_map = {root: self.KeepService(root) for root in hint_roots} + blob = None + loop = retry.RetryLoop(num_retries, self._check_loop_result, + backoff_start=2) + for tries_left in loop: + try: + local_roots = self.map_new_services( + roots_map, expect_hash, + force_rebuild=(tries_left < num_retries)) + except Exception as error: + loop.save_result(error) + continue + + # Query KeepService objects that haven't returned + # permanent failure, in our specified shuffle order. + services_to_try = [roots_map[root] + for root in (local_roots + hint_roots) + if roots_map[root].usable()] + http = httplib2.Http(timeout=self.timeout) + for keep_service in services_to_try: + blob = keep_service.get(http, locator) + if blob is not None: + break + loop.save_result((blob, len(services_to_try))) + + # Always cache the result, then return it if we succeeded. + slot.set(blob) self.cap_cache() - raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash) + if loop.success(): + return blob + + # No servers fulfilled the request. Count how many responded + # "not found;" if the ratio is high enough (currently 75%), report + # Not Found; otherwise a generic error. + # Q: Including 403 is necessary for the Keep tests to continue + # passing, but maybe they should expect KeepReadError instead? + not_founds = sum(1 for ks in roots_map.values() + if ks.last_status() in set([403, 404, 410])) + if roots_map and ((float(not_founds) / len(roots_map)) >= .75): + raise arvados.errors.NotFoundError(loc_s) + else: + raise arvados.errors.KeepReadError(loc_s) - def get_url(self, url, headers, expect_hash): - h = httplib2.Http() - try: - _logger.debug("Request: GET %s", url) - with timer.Timer() as t: - resp, content = h.request(url.encode('utf-8'), 'GET', - headers=headers) - _logger.info("Received %s bytes in %s msec (%s MiB/sec)", - len(content), t.msecs, - (len(content)/(1024*1024))/t.secs) - if re.match(r'^2\d\d$', resp['status']): - md5 = hashlib.md5(content).hexdigest() - if md5 == expect_hash: - return content - _logger.warning("Checksum fail: md5(%s) = %s", url, md5) - except Exception as e: - _logger.debug("Request fail: GET %s => %s: %s", - url, type(e), str(e)) - return None - - def put(self, data, copies=2): + def put(self, data, copies=2, num_retries=0): + """Save data in Keep. + + This method will get a list of Keep services from the API server, and + send the data to each one simultaneously in a new thread. Once the + uploads are finished, if enough copies are saved, this method returns + the most recent HTTP response body. If requests fail to upload + enough copies, this method raises KeepWriteError. + + Arguments: + * data: The string of data to upload. + * copies: The number of copies that the user requires be saved. + Default 2. + * num_retries: The number of times to retry PUT requests to + *each* Keep server if it returns temporary failures, with + exponential backoff. Default 0. + """ data_hash = hashlib.md5(data).hexdigest() - have_copies = 0 - want_copies = copies - if not (want_copies > 0): + if copies < 1: return data_hash - threads = [] - thread_limiter = KeepClient.ThreadLimiter(want_copies) - for service_root in self.shuffled_service_roots(data_hash): - t = KeepClient.KeepWriterThread( - self.api_token, - data=data, - data_hash=data_hash, - service_root=service_root, - thread_limiter=thread_limiter, - timeout=self.timeout, - using_proxy=self.using_proxy, - want_copies=(want_copies if self.using_proxy else 1)) - t.start() - threads += [t] - for t in threads: - t.join() - if thread_limiter.done() < want_copies: - # Retry the threads (i.e., services) that failed the first - # time around. - threads_retry = [] + + headers = {} + if self.using_proxy: + # Tell the proxy how many copies we want it to store + headers['X-Keep-Desired-Replication'] = str(copies) + roots_map = {} + thread_limiter = KeepClient.ThreadLimiter(copies) + loop = retry.RetryLoop(num_retries, self._check_loop_result, + backoff_start=2) + for tries_left in loop: + try: + local_roots = self.map_new_services( + roots_map, data_hash, + force_rebuild=(tries_left < num_retries), **headers) + except Exception as error: + loop.save_result(error) + continue + + threads = [] + for service_root, ks in roots_map.iteritems(): + if ks.finished(): + continue + t = KeepClient.KeepWriterThread( + ks, + data=data, + data_hash=data_hash, + service_root=service_root, + thread_limiter=thread_limiter, + timeout=self.timeout) + t.start() + threads.append(t) for t in threads: - if not t.success(): - _logger.debug("Retrying: PUT %s %s", - t.args['service_root'], - t.args['data_hash']) - retry_with_args = t.args.copy() - t_retry = KeepClient.KeepWriterThread(self.api_token, - **retry_with_args) - t_retry.start() - threads_retry += [t_retry] - for t in threads_retry: t.join() - have_copies = thread_limiter.done() - # If we're done, return the response from Keep - if have_copies >= want_copies: + loop.save_result((thread_limiter.done() >= copies, len(threads))) + + if loop.success(): return thread_limiter.response() raise arvados.errors.KeepWriteError( "Write fail for %s: wanted %d but wrote %d" % - (data_hash, want_copies, have_copies)) - + (data_hash, copies, thread_limiter.done())) def local_store_put(self, data): md5 = hashlib.md5(data).hexdigest() diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index 6198919e8b..4ac9df17ec 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -1,12 +1,11 @@ -# usage example: -# -# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover - +import mock import os import unittest import arvados +import arvados.retry import run_test_server +from arvados_testutil import fake_httplib2_response class KeepTestCase(run_test_server.TestCaseWithServers): MAIN_SERVER = {} @@ -219,3 +218,108 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers): 'baz2', 'wrong content from Keep.get(md5("baz2"))') self.assertTrue(keep_client.using_proxy) + + +class KeepClientRetryTestMixin(object): + # Testing with a local Keep store won't exercise the retry behavior. + # Instead, our strategy is: + # * Create a client with one proxy specified (pointed at a black + # hole), so there's no need to instantiate an API client, and + # all HTTP requests come from one place. + # * Mock httplib's request method to provide simulated responses. + # This lets us test the retry logic extensively without relying on any + # supporting servers, and prevents side effects in case something hiccups. + # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and + # run_method(). + PROXY_ADDR = 'http://[100::]/' + TEST_DATA = 'testdata' + TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8' + + @staticmethod + def mock_responses(body, *codes): + return mock.patch('httplib2.Http.request', side_effect=( + (fake_httplib2_response(code), body) for code in codes)) + + def new_client(self): + return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='') + + def run_method(self, *args, **kwargs): + raise NotImplementedError("test subclasses must define run_method") + + def check_success(self, expected=None, *args, **kwargs): + if expected is None: + expected = self.DEFAULT_EXPECT + self.assertEqual(expected, self.run_method(*args, **kwargs)) + + def check_exception(self, error_class=None, *args, **kwargs): + if error_class is None: + error_class = self.DEFAULT_EXCEPTION + self.assertRaises(error_class, self.run_method, *args, **kwargs) + + def test_immediate_success(self): + with self.mock_responses(self.DEFAULT_EXPECT, 200): + self.check_success() + + def test_retry_then_success(self): + with self.mock_responses(self.DEFAULT_EXPECT, 500, 200): + self.check_success(num_retries=3) + + def test_no_default_retry(self): + with self.mock_responses(self.DEFAULT_EXPECT, 500, 200): + self.check_exception() + + def test_no_retry_after_permanent_error(self): + with self.mock_responses(self.DEFAULT_EXPECT, 403, 200): + self.check_exception(num_retries=3) + + def test_error_after_retries_exhausted(self): + with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200): + self.check_exception(num_retries=1) + + +# Don't delay from HTTPRetryLoop's exponential backoff. +no_backoff = mock.patch('time.sleep', lambda n: None) +@no_backoff +class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin): + DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA + DEFAULT_EXCEPTION = arvados.errors.KeepReadError + HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy' + + def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR, + *args, **kwargs): + return self.new_client().get(locator, *args, **kwargs) + + def test_specific_exception_when_not_found(self): + with self.mock_responses(self.DEFAULT_EXPECT, 404, 200): + self.check_exception(arvados.errors.NotFoundError, num_retries=3) + + def test_general_exception_with_mixed_errors(self): + # get should raise a NotFoundError if no server returns the block, + # and a high threshold of servers report that it's not found. + # This test rigs up 50/50 disagreement between two servers, and + # checks that it does not become a NotFoundError. + client = self.new_client() + with self.mock_responses(self.DEFAULT_EXPECT, 404, 500): + with self.assertRaises(arvados.errors.KeepReadError) as exc_check: + client.get(self.HINTED_LOCATOR) + self.assertNotIsInstance( + exc_check.exception, arvados.errors.NotFoundError, + "mixed errors raised NotFoundError") + + def test_hint_server_can_succeed_without_retries(self): + with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500): + self.check_success(locator=self.HINTED_LOCATOR) + + +@no_backoff +class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin): + DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR + DEFAULT_EXCEPTION = arvados.errors.KeepWriteError + + def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA, + copies=1, *args, **kwargs): + return self.new_client().put(data, copies, *args, **kwargs) + + def test_do_not_send_multiple_copies_to_same_server(self): + with self.mock_responses(self.DEFAULT_EXPECT, 200): + self.check_exception(copies=2, num_retries=3) -- 2.30.2