From 9ccdea630f00035e96eb2bac539a4bd0d3df196d Mon Sep 17 00:00:00 2001 From: Lucas Di Pentima Date: Fri, 24 Jun 2016 12:34:55 -0300 Subject: [PATCH] Some tests done on new code --- sdk/python/arvados/keep.py | 20 ++++++----- sdk/python/tests/test_keep_client.py | 50 ++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 5d511e4051..e0e7797f80 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -568,7 +568,7 @@ class KeepClient(object): class KeepWriterQueue(Queue.Queue): def __init__(self, copies): - super(KeepClient.KeepWriterQueue, self).__init__() + Queue.Queue.__init__(self) # Old-style superclass self.wanted_copies = copies self.successful_copies = 0 self.successful_copies_lock = threading.Lock() @@ -589,18 +589,21 @@ class KeepClient(object): class KeepWriterThreadPool(object): - def __init__(self, data, data_hash, num_threads, copies=2): + def __init__(self, data, data_hash, copies=2, num_threads=None): self.wanted_copies = copies + if num_threads is None: + num_threads = copies self.workers = [] self.queue = KeepClient.KeepWriterQueue(copies) # Start workers for _ in range(num_threads): - self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash)) + w = KeepClient.KeepWriterThreadNew(self.queue, data, data_hash) + self.workers.append(w) def add_task(self, ks, service_root): self.queue.put((ks, service_root)) - def successful_copies(self): + def done(self): return self.queue.successful_copies def start(self): @@ -621,7 +624,6 @@ class KeepClient(object): self.queue = queue self.data = data self.data_hash = data_hash - self.daemon = True def run(self): while not self.queue.empty(): @@ -636,9 +638,9 @@ class KeepClient(object): # Get to work service, service_root = self.queue.get() - success = bool(self.service.put(self.data_hash, - self.data, - timeout=None)) + success = bool(service.put(self.data_hash, + self.data, + timeout=None)) result = service.last_result() if success: _logger.debug("KeepWriterThread %s succeeded %s+%i %s", @@ -1153,7 +1155,7 @@ class KeepClient(object): headers = {} # Tell the proxy how many copies we want it to store - headers['X-Keep-Desired-Replication'] = str(copies) + headers['X-Keep-Desired-Replicas'] = str(copies) roots_map = {} loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index 977f05ed8e..314b93d5c3 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -1065,6 +1065,56 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase): self.check_exception(copies=2, num_retries=3) +class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.ApiClientMock): + + + class FakeKeepService(object): + def __init__(self, delay, will_succeed, replicas=1): + self.delay = delay + self.success = will_succeed + self._result = {} + self._result['headers'] = {} + self._result['headers']['x-keep-replicas-stored'] = str(replicas) + + def put(self, data_hash, data, timeout): + time.sleep(self.delay) + return self.success + + def last_result(self): + return self._result + + + def test_only_write_enough_on_success(self): + copies = 3 + pool = arvados.KeepClient.KeepWriterThreadPool( + data = 'foo', + data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3', + copies = copies + ) + for i in range(10): + ks = self.FakeKeepService(delay=i/10.0, will_succeed=True) + pool.add_task(ks, None) + pool.start() + pool.join() + self.assertEqual(pool.done(), copies) + + def test_only_write_enough_on_partial_success(self): + copies = 3 + pool = arvados.KeepClient.KeepWriterThreadPool( + data = 'foo', + data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3', + copies = copies + ) + for i in range(5): + ks = self.FakeKeepService(delay=i/10.0, will_succeed=False) + pool.add_task(ks, None) + ks = self.FakeKeepService(delay=i/10.0, will_succeed=True) + pool.add_task(ks, None) + pool.start() + pool.join() + self.assertEqual(pool.done(), copies) + + class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock): -- 2.39.5