- pool.add_task(ks, None)
- pool.start()
- pool.join()
- self.assertEqual(pool.done(), copies)
-
-
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-
-
- class KeepFakeWriterThread(threading.Thread):
- """
- Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter.
- """
- def __init__(self, delay, will_succeed, thread_limiter):
- super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__()
- self.delay = delay # in seconds
- self.success = will_succeed
- self.limiter = thread_limiter
-
- def run(self):
- with self.limiter:
- if not self.limiter.shall_i_proceed():
- return
- time.sleep(self.delay)
- if self.success:
- self.limiter.save_response('foo', 1)
- else:
- self.limiter.save_response(None, 0)
-
- def test_only_write_enough_on_success(self):
- copies = 3
- threads = []
- limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
- # Setting up fake writer threads with different delays so that the bug is revealed
- for i in range(copies*2):
- t = self.KeepFakeWriterThread(
- delay=i/10.0,
- will_succeed=True,
- thread_limiter=limiter)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- self.assertEqual(limiter.done(), copies)
-
- def test_only_write_enough_on_partial_failure(self):
- copies = 3
- threads = []
- limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
- for i in range(copies):
- t = self.KeepFakeWriterThread(
- delay=i/10.0,
- will_succeed=False,
- thread_limiter=limiter)
- t.start()
- threads.append(t)
- t = self.KeepFakeWriterThread(
- delay=i/10.0,
- will_succeed=True,
- thread_limiter=limiter)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- self.assertEqual(limiter.done(), copies)