finally:
self._cache_lock.release()
- def add_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
# to KeepService objects. Poll for Keep services, and add any
# new ones to roots_map. Return the current list of local
roots_map[root] = self.KeepService(root, **headers)
return local_roots
+ @staticmethod
+ def _check_loop_result(result):
+ # KeepClient RetryLoops should save results as a 2-tuple: the
+ # actual result of the request, and the number of servers that
+ # received the request.
+ # This method returns True if there's a real result, False if
+ # there are no more servers receiving the request, otherwise None.
+ if isinstance(result, Exception):
+ return None
+ result, tried_server_count = result
+ if (result is not None) and (result is not False):
+ return True
+ elif tried_server_count < 1:
+ _logger.info("No more Keep services to try; giving up")
+ return False
+ else:
+ return None
+
def get(self, loc_s, num_retries=0):
"""Get data from Keep.
# Map root URLs their KeepService objects.
roots_map = {root: self.KeepService(root) for root in hint_roots}
blob = None
- loop = retry.HTTPRetryLoop(num_retries,
- lambda r: None if blob is None else True)
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
for tries_left in loop:
try:
- local_roots = self.add_new_services(
+ local_roots = self.map_new_services(
roots_map, expect_hash,
force_rebuild=(tries_left < num_retries))
except Exception as error:
loop.save_result(error)
continue
- # Build an ordered list of KeepService objects that haven't
- # returned permanent failure.
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
services_to_try = [roots_map[root]
for root in (local_roots + hint_roots)
if roots_map[root].usable()]
- if not services_to_try:
- _logger.info(
- "All Keep services for %s have permafailed; giving up",
- loc_s)
- break
-
http = httplib2.Http(timeout=self.timeout)
for keep_service in services_to_try:
blob = keep_service.get(http, locator)
if blob is not None:
- loop.save_result(blob)
break
+ loop.save_result((blob, len(services_to_try)))
# Always cache the result, then return it if we succeeded.
slot.set(blob)
# Not Found; otherwise a generic error.
not_founds = sum(1 for ks in roots_map.values()
if ks.last_status() in set([403, 404, 410]))
- if (float(not_founds) / len(roots_map)) >= .75:
+ if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
raise arvados.errors.NotFoundError(loc_s)
else:
raise arvados.errors.KeepReadError(loc_s)
headers = {}
if self.using_proxy:
- # We're using a proxy, so tell the proxy how many copies we
- # want it to store
+ # Tell the proxy how many copies we want it to store
headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
thread_limiter = KeepClient.ThreadLimiter(copies)
- loop = retry.HTTPRetryLoop(
- num_retries,
- lambda r: True if (thread_limiter.done() >= copies) else None)
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
for tries_left in loop:
try:
- local_roots = self.add_new_services(
+ local_roots = self.map_new_services(
roots_map, data_hash,
force_rebuild=(tries_left < num_retries), **headers)
except Exception as error:
timeout=self.timeout)
t.start()
threads.append(t)
-
- if not threads:
- _logger.info(
- "All Keep services for %s have finished; giving up",
- data_hash)
- break
for t in threads:
t.join()
- loop.save_result(None)
+ loop.save_result((thread_limiter.done() >= copies, len(threads)))
if loop.success():
return thread_limiter.response()
class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+ HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
*args, **kwargs):
# This test rigs up 50/50 disagreement between two servers, and
# checks that it does not become a NotFoundError.
client = self.new_client()
- client.service_roots = [self.PROXY_ADDR, self.PROXY_ADDR]
with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
- client.get(self.TEST_LOCATOR)
+ client.get(self.HINTED_LOCATOR)
self.assertNotIsInstance(
exc_check.exception, arvados.errors.NotFoundError,
"mixed errors raised NotFoundError")
def test_hint_server_can_succeed_without_retries(self):
with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
- self.check_success(locator=self.TEST_LOCATOR + '+K@xyzzy')
+ self.check_success(locator=self.HINTED_LOCATOR)
@no_backoff