From 4aaa1f6aeae33dc353ea3a70f901065b4f694fed Mon Sep 17 00:00:00 2001 From: Lucas Di Pentima Date: Thu, 16 Jun 2016 20:06:21 -0300 Subject: [PATCH] 9180: Changed some of the logic on ThreadLimiter and made unit tests to validate the new behaviour refs #9180 --- sdk/python/arvados/keep.py | 26 +++++++++-- sdk/python/tests/test_keep_client.py | 69 ++++++++++++++++++++++++---- 2 files changed, 81 insertions(+), 14 deletions(-) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 3c0ad6f7a9..9e9fb00833 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -251,6 +251,7 @@ class KeepClient(object): self._started = 0 self._want_copies = want_copies self._done = 0 + self._thread_failures = 0 self._response = None self._start_lock = threading.Condition() if (not max_service_replicas) or (max_service_replicas >= want_copies): @@ -260,6 +261,7 @@ class KeepClient(object): _logger.debug("Limiter max threads is %d", max_threads) self._todo_lock = threading.Semaphore(max_threads) self._done_lock = threading.Lock() + self._thread_failures_lock = threading.Lock() self._local = threading.local() def __enter__(self): @@ -276,7 +278,14 @@ class KeepClient(object): return self def __exit__(self, type, value, traceback): - self._todo_lock.release() + with self._thread_failures_lock: + if self._thread_failures > 0: + self._thread_failures -= 1 + self._todo_lock.release() + + # If work is finished, release al pending threads + if not self.shall_i_proceed(): + self._todo_lock.release() def set_sequence(self, sequence): self._local.sequence = sequence @@ -294,9 +303,14 @@ class KeepClient(object): Records a response body (a locator, possibly signed) returned by the Keep server, and the number of replicas it stored. """ - with self._done_lock: - self._done += replicas_stored - self._response = response_body + if replicas_stored == 0: + # Failure notification, should start a new thread to try to reach full replication + with self._thread_failures_lock: + self._thread_failures += 1 + else: + with self._done_lock: + self._done += replicas_stored + self._response = response_body def response(self): """Return the body from the response to a PUT request.""" @@ -612,6 +626,10 @@ class KeepClient(object): self.args['data_hash'], result['status_code'], result['body']) + if not self._success: + # Notify the failure so that the Thread limiter allows + # a new one to run. + limiter.save_response(None, 0) def __init__(self, api_client=None, proxy=None, diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index eff344cc54..33b96fe297 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -1064,18 +1064,67 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase): with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200): self.check_exception(copies=2, num_retries=3) -# @tutil.skip_sleep + class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock): - def setUp(self): - self.api_client = self.mock_keep_services(count=4) - self.keep_client = arvados.KeepClient(api_client=self.api_client) - def test_only_send_enough_on_success(self): - with tutil.mock_keep_responses( - 'acbd18db4cc2f85cedef654fccc4a4d8+3', - 200, 200, 200, 200) as req_mock: - self.keep_client.put('foo', num_retries=1, copies=2) - self.assertEqual(3, req_mock.call_count) + + class KeepFakeWriterThread(threading.Thread): + """ + Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter. + """ + def __init__(self, delay, will_succeed, thread_limiter): + super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__() + self.delay = delay # in seconds + self.success = will_succeed + self.limiter = thread_limiter + + def run(self): + with self.limiter: + if not self.limiter.shall_i_proceed(): + return + time.sleep(self.delay) + if self.success: + self.limiter.save_response('foo', 1) + else: + self.limiter.save_response(None, 0) + + def test_only_write_enough_on_success(self): + copies = 3 + threads = [] + limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1) + # Setting up fake writer threads with different delays so that the bug is revealed + for i in range(copies*2): + t = self.KeepFakeWriterThread( + delay=i/10.0, + will_succeed=True, + thread_limiter=limiter) + t.start() + threads.append(t) + for t in threads: + t.join() + self.assertEqual(limiter.done(), copies) + + def test_only_write_enough_on_partial_failure(self): + copies = 3 + threads = [] + limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1) + for i in range(copies): + t = self.KeepFakeWriterThread( + delay=i/10.0, + will_succeed=False, + thread_limiter=limiter) + t.start() + threads.append(t) + t = self.KeepFakeWriterThread( + delay=i/10.0, + will_succeed=True, + thread_limiter=limiter) + t.start() + threads.append(t) + for t in threads: + t.join() + self.assertEqual(limiter.done(), copies) + @tutil.skip_sleep class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock): -- 2.30.2