3147: Fixup KeepClient's RetryLoop use.
authorBrett Smith <brett@curoverse.com>
Sun, 24 Aug 2014 15:49:00 +0000 (11:49 -0400)
committerBrett Smith <brett@curoverse.com>
Sun, 24 Aug 2014 15:49:00 +0000 (11:49 -0400)
Switch to RetryLoop, and use the loop end logic better.

sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py

index 4a4998a37daa659fe9dc024cb1416339da55b0fa..b9aa9db97bcd3093d2e5adddce508f00e345a6b8 100644 (file)
@@ -514,7 +514,7 @@ class KeepClient(object):
         finally:
             self._cache_lock.release()
 
-    def add_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
@@ -526,6 +526,24 @@ class KeepClient(object):
                 roots_map[root] = self.KeepService(root, **headers)
         return local_roots
 
+    @staticmethod
+    def _check_loop_result(result):
+        # KeepClient RetryLoops should save results as a 2-tuple: the
+        # actual result of the request, and the number of servers that
+        # received the request.
+        # This method returns True if there's a real result, False if
+        # there are no more servers receiving the request, otherwise None.
+        if isinstance(result, Exception):
+            return None
+        result, tried_server_count = result
+        if (result is not None) and (result is not False):
+            return True
+        elif tried_server_count < 1:
+            _logger.info("No more Keep services to try; giving up")
+            return False
+        else:
+            return None
+
     def get(self, loc_s, num_retries=0):
         """Get data from Keep.
 
@@ -565,34 +583,28 @@ class KeepClient(object):
         # Map root URLs their KeepService objects.
         roots_map = {root: self.KeepService(root) for root in hint_roots}
         blob = None
-        loop = retry.HTTPRetryLoop(num_retries,
-                                   lambda r: None if blob is None else True)
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.add_new_services(
+                local_roots = self.map_new_services(
                     roots_map, expect_hash,
                     force_rebuild=(tries_left < num_retries))
             except Exception as error:
                 loop.save_result(error)
                 continue
 
-            # Build an ordered list of KeepService objects that haven't
-            # returned permanent failure.
+            # Query KeepService objects that haven't returned
+            # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
-            if not services_to_try:
-                _logger.info(
-                    "All Keep services for %s have permafailed; giving up",
-                    loc_s)
-                break
-
             http = httplib2.Http(timeout=self.timeout)
             for keep_service in services_to_try:
                 blob = keep_service.get(http, locator)
                 if blob is not None:
-                    loop.save_result(blob)
                     break
+            loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
         slot.set(blob)
@@ -605,7 +617,7 @@ class KeepClient(object):
         # Not Found; otherwise a generic error.
         not_founds = sum(1 for ks in roots_map.values()
                          if ks.last_status() in set([403, 404, 410]))
-        if (float(not_founds) / len(roots_map)) >= .75:
+        if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
             raise arvados.errors.NotFoundError(loc_s)
         else:
             raise arvados.errors.KeepReadError(loc_s)
@@ -633,17 +645,15 @@ class KeepClient(object):
 
         headers = {}
         if self.using_proxy:
-            # We're using a proxy, so tell the proxy how many copies we
-            # want it to store
+            # Tell the proxy how many copies we want it to store
             headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
         thread_limiter = KeepClient.ThreadLimiter(copies)
-        loop = retry.HTTPRetryLoop(
-            num_retries,
-            lambda r: True if (thread_limiter.done() >= copies) else None)
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.add_new_services(
+                local_roots = self.map_new_services(
                     roots_map, data_hash,
                     force_rebuild=(tries_left < num_retries), **headers)
             except Exception as error:
@@ -663,15 +673,9 @@ class KeepClient(object):
                     timeout=self.timeout)
                 t.start()
                 threads.append(t)
-
-            if not threads:
-                _logger.info(
-                    "All Keep services for %s have finished; giving up",
-                    data_hash)
-                break
             for t in threads:
                 t.join()
-            loop.save_result(None)
+            loop.save_result((thread_limiter.done() >= copies, len(threads)))
 
         if loop.success():
             return thread_limiter.response()
index 3476069cd66369fd34dc4eca6c2493fc72263b03..4ac9df17ecf838b4b65685636af0f8ab5b59c3b1 100644 (file)
@@ -283,6 +283,7 @@ no_backoff = mock.patch('time.sleep', lambda n: None)
 class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+    HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
 
     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
                    *args, **kwargs):
@@ -298,17 +299,16 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
         # This test rigs up 50/50 disagreement between two servers, and
         # checks that it does not become a NotFoundError.
         client = self.new_client()
-        client.service_roots = [self.PROXY_ADDR, self.PROXY_ADDR]
         with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
-                client.get(self.TEST_LOCATOR)
+                client.get(self.HINTED_LOCATOR)
             self.assertNotIsInstance(
                 exc_check.exception, arvados.errors.NotFoundError,
                 "mixed errors raised NotFoundError")
 
     def test_hint_server_can_succeed_without_retries(self):
         with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
-            self.check_success(locator=self.TEST_LOCATOR + '+K@xyzzy')
+            self.check_success(locator=self.HINTED_LOCATOR)
 
 
 @no_backoff