7696: PySDK determines max_replicas_per_service after querying services.
authorBrett Smith <brett@curoverse.com>
Wed, 11 Nov 2015 21:50:18 +0000 (16:50 -0500)
committerBrett Smith <brett@curoverse.com>
Fri, 13 Nov 2015 14:29:24 +0000 (09:29 -0500)
Because max_replicas_per_service was set to 1 in the case where
KeepClient was instantiated with no direct information about available
Keep services, and because ThreadLimiter was being instantiated before
querying available Keep services (via map_new_services), the first
Keep request to talk to non-disk services would let multiple threads
run at once.  This fixes that race condition, and adds a test that was
triggering it semi-reliably.

sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py

index b3d64a41981b7bcbaf293a2f90f43abc79128f1a..2b718d7a4d58397e8e8d8aaec6cb81d078e65843 100644 (file)
@@ -645,6 +645,7 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.max_replicas_per_service = None
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -658,7 +659,6 @@ class KeepClient(object):
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = None
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -671,7 +671,6 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
-                self.max_replicas_per_service = 1
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -955,10 +954,8 @@ class KeepClient(object):
         # Tell the proxy how many copies we want it to store
         headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        thread_sequence = 0
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -968,6 +965,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
@@ -980,10 +978,9 @@ class KeepClient(object):
                     service_root=service_root,
                     thread_limiter=thread_limiter,
                     timeout=self.current_timeout(num_retries-tries_left),
-                    thread_sequence=thread_sequence)
+                    thread_sequence=len(threads))
                 t.start()
                 threads.append(t)
-                thread_sequence += 1
             for t in threads:
                 t.join()
             loop.save_result((thread_limiter.done() >= copies, len(threads)))
index a5d9925a750a2ee1c88b89e947441c0a01c8d0a3..ada0dac80e2e8daada36f143400ab90d7d9db6d6 100644 (file)
@@ -404,6 +404,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
             actual = keep_client.put(body, copies=1)
         self.assertEqual(pdh, actual)
 
+    def test_oddball_service_writer_count(self):
+        body = 'oddball service writer count'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore',
+                                             count=4)
+        headers = {'x-keep-replicas-stored': 3}
+        with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+                                       **headers) as req_mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=2)
+        self.assertEqual(pdh, actual)
+        self.assertEqual(1, req_mock.call_count)
+
 
 @tutil.skip_sleep
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):