- def test_only_send_enough_on_success(self):
- with tutil.mock_keep_responses(
- 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- 200, 200, 200, 200) as req_mock:
- self.keep_client.put('foo', num_retries=1, copies=2)
- self.assertEqual(3, req_mock.call_count)
+
+ class KeepFakeWriterThread(threading.Thread):
+ """
+ Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter.
+ """
+ def __init__(self, delay, will_succeed, thread_limiter):
+ super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__()
+ self.delay = delay # in seconds
+ self.success = will_succeed
+ self.limiter = thread_limiter
+
+ def run(self):
+ with self.limiter:
+ if not self.limiter.shall_i_proceed():
+ return
+ time.sleep(self.delay)
+ if self.success:
+ self.limiter.save_response('foo', 1)
+ else:
+ self.limiter.save_response(None, 0)
+
+ def test_only_write_enough_on_success(self):
+ copies = 3
+ threads = []
+ limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
+ # Setting up fake writer threads with different delays so that the bug is revealed
+ for i in range(copies*2):
+ t = self.KeepFakeWriterThread(
+ delay=i/10.0,
+ will_succeed=True,
+ thread_limiter=limiter)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+ self.assertEqual(limiter.done(), copies)
+
+ def test_only_write_enough_on_partial_failure(self):
+ copies = 3
+ threads = []
+ limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
+ for i in range(copies):
+ t = self.KeepFakeWriterThread(
+ delay=i/10.0,
+ will_succeed=False,
+ thread_limiter=limiter)
+ t.start()
+ threads.append(t)
+ t = self.KeepFakeWriterThread(
+ delay=i/10.0,
+ will_succeed=True,
+ thread_limiter=limiter)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+ self.assertEqual(limiter.done(), copies)
+