From bef2f8e7bc3bd70b286971885294e5da2729da7b Mon Sep 17 00:00:00 2001 From: Lucas Di Pentima Date: Fri, 24 Jun 2016 19:10:22 -0300 Subject: [PATCH] PySDK put() refactoring ready, all local tests worked OK --- sdk/python/arvados/keep.py | 234 ++++++--------------------- sdk/python/tests/test_keep_client.py | 71 +------- 2 files changed, 55 insertions(+), 250 deletions(-) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index e0e7797f80..682ca0e835 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -237,90 +237,6 @@ class KeepClient(object): DEFAULT_TIMEOUT = (2, 256, 32768) DEFAULT_PROXY_TIMEOUT = (20, 256, 32768) - class ThreadLimiter(object): - """Limit the number of threads writing to Keep at once. - - This ensures that only a number of writer threads that could - potentially achieve the desired replication level run at once. - Once the desired replication level is achieved, queued threads - are instructed not to run. - - Should be used in a "with" block. - """ - def __init__(self, want_copies, max_service_replicas): - self._started = 0 - self._want_copies = want_copies - self._done = 0 - self._thread_failures = 0 - self._response = None - self._start_lock = threading.Condition() - if (not max_service_replicas) or (max_service_replicas >= want_copies): - max_threads = 1 - else: - max_threads = math.ceil(float(want_copies) / max_service_replicas) - _logger.debug("Limiter max threads is %d", max_threads) - self._todo_lock = threading.Semaphore(max_threads) - self._done_lock = threading.Lock() - self._thread_failures_lock = threading.Lock() - self._local = threading.local() - - def __enter__(self): - self._start_lock.acquire() - if getattr(self._local, 'sequence', None) is not None: - # If the calling thread has used set_sequence(N), then - # we wait here until N other threads have started. - while self._started < self._local.sequence: - self._start_lock.wait() - self._todo_lock.acquire() - self._started += 1 - self._start_lock.notifyAll() - self._start_lock.release() - return self - - def __exit__(self, type, value, traceback): - with self._thread_failures_lock: - if self._thread_failures > 0: - self._thread_failures -= 1 - self._todo_lock.release() - - # If work is finished, release al pending threads - if not self.shall_i_proceed(): - self._todo_lock.release() - - def set_sequence(self, sequence): - self._local.sequence = sequence - - def shall_i_proceed(self): - """ - Return true if the current thread should write to Keep. - Return false otherwise. - """ - with self._done_lock: - return (self._done < self._want_copies) - - def save_response(self, response_body, replicas_stored): - """ - Records a response body (a locator, possibly signed) returned by - the Keep server, and the number of replicas it stored. - """ - if replicas_stored == 0: - # Failure notification, should start a new thread to try to reach full replication - with self._thread_failures_lock: - self._thread_failures += 1 - else: - with self._done_lock: - self._done += replicas_stored - self._response = response_body - - def response(self): - """Return the body from the response to a PUT request.""" - with self._done_lock: - return self._response - - def done(self): - """Return the total number of replicas successfully stored.""" - with self._done_lock: - return self._done class KeepService(object): """Make requests to a single Keep service, and track results. @@ -571,13 +487,15 @@ class KeepClient(object): Queue.Queue.__init__(self) # Old-style superclass self.wanted_copies = copies self.successful_copies = 0 + self.response = None self.successful_copies_lock = threading.Lock() self.retries = copies self.retries_notification = threading.Condition() - def write_success(self, replicas_nr): + def write_success(self, response, replicas_nr): with self.successful_copies_lock: self.successful_copies += replicas_nr + self.response = response def write_fail(self, ks, status_code): with self.retries_notification: @@ -589,38 +507,47 @@ class KeepClient(object): class KeepWriterThreadPool(object): - def __init__(self, data, data_hash, copies=2, num_threads=None): + def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None): + self.total_task_nr = 0 self.wanted_copies = copies - if num_threads is None: - num_threads = copies + if (not max_service_replicas) or (max_service_replicas >= copies): + num_threads = 1 + else: + num_threads = int(math.ceil(float(copies) / max_service_replicas)) + _logger.debug("Pool max threads is %d", num_threads) self.workers = [] self.queue = KeepClient.KeepWriterQueue(copies) - # Start workers + # Create workers for _ in range(num_threads): - w = KeepClient.KeepWriterThreadNew(self.queue, data, data_hash) + w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) self.workers.append(w) def add_task(self, ks, service_root): self.queue.put((ks, service_root)) + self.total_task_nr += 1 def done(self): return self.queue.successful_copies - def start(self): + def join(self): + # Start workers for worker in self.workers: worker.start() - - def join(self): + # Wait for finished work self.queue.join() with self.queue.retries_notification: self.queue.retries_notification.notify_all() for worker in self.workers: worker.join() + + def response(self): + return self.queue.response - class KeepWriterThreadNew(threading.Thread): - def __init__(self, queue, data, data_hash): - super(KeepClient.KeepWriterThreadNew, self).__init__() + class KeepWriterThread(threading.Thread): + def __init__(self, queue, data, data_hash, timeout=None): + super(KeepClient.KeepWriterThread, self).__init__() + self.timeout = timeout self.queue = queue self.data = data self.data_hash = data_hash @@ -636,11 +563,16 @@ class KeepClient(object): self.queue.retries -= 1 # Get to work - service, service_root = self.queue.get() - + try: + service, service_root = self.queue.get_nowait() + except Queue.Empty: + continue + if service.finished(): + self.queue.task_done() + continue success = bool(service.put(self.data_hash, self.data, - timeout=None)) + timeout=self.timeout)) result = service.last_result() if success: _logger.debug("KeepWriterThread %s succeeded %s+%i %s", @@ -652,7 +584,8 @@ class KeepClient(object): replicas_stored = int(result['headers']['x-keep-replicas-stored']) except (KeyError, ValueError): replicas_stored = 1 - self.queue.write_success(replicas_stored) + + self.queue.write_success(result['body'].strip(), replicas_stored) else: if result.get('status_code', None): _logger.debug("Request fail: PUT %s => %s %s", @@ -662,77 +595,14 @@ class KeepClient(object): self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry else: # Remove the task from the queue anyways - self.queue.get() + try: + self.queue.get_nowait() + except Queue.Empty: + continue # Mark as done so the queue can be join()ed self.queue.task_done() - class KeepWriterThread(threading.Thread): - """ - Write a blob of data to the given Keep server. On success, call - save_response() of the given ThreadLimiter to save the returned - locator. - """ - def __init__(self, keep_service, **kwargs): - super(KeepClient.KeepWriterThread, self).__init__() - self.service = keep_service - self.args = kwargs - self._success = False - - def success(self): - return self._success - - def run(self): - limiter = self.args['thread_limiter'] - sequence = self.args['thread_sequence'] - if sequence is not None: - limiter.set_sequence(sequence) - with limiter: - if not limiter.shall_i_proceed(): - # My turn arrived, but the job has been done without - # me. - return - self.run_with_limiter(limiter) - - def run_with_limiter(self, limiter): - if self.service.finished(): - return - _logger.debug("KeepWriterThread %s proceeding %s+%i %s", - str(threading.current_thread()), - self.args['data_hash'], - len(self.args['data']), - self.args['service_root']) - self._success = bool(self.service.put( - self.args['data_hash'], - self.args['data'], - timeout=self.args.get('timeout', None))) - result = self.service.last_result() - if self._success: - _logger.debug("KeepWriterThread %s succeeded %s+%i %s", - str(threading.current_thread()), - self.args['data_hash'], - len(self.args['data']), - self.args['service_root']) - # Tick the 'done' counter for the number of replica - # reported stored by the server, for the case that - # we're talking to a proxy or other backend that - # stores to multiple copies for us. - try: - replicas_stored = int(result['headers']['x-keep-replicas-stored']) - except (KeyError, ValueError): - replicas_stored = 1 - limiter.save_response(result['body'].strip(), replicas_stored) - elif result.get('status_code', None): - _logger.debug("Request fail: PUT %s => %s %s", - self.args['data_hash'], - result['status_code'], - result['body']) - if not self._success: - # Notify the failure so that the Thread limiter allows - # a new one to run. - limiter.save_response(None, 0) - - def __init__(self, api_client=None, proxy=None, timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, api_token=None, local_store=None, block_cache=None, @@ -1169,30 +1039,22 @@ class KeepClient(object): loop.save_result(error) continue - thread_limiter = KeepClient.ThreadLimiter( - copies - done, self.max_replicas_per_service) - threads = [] + writer_pool = KeepClient.KeepWriterThreadPool(data=data, + data_hash=data_hash, + copies=copies - done, + max_service_replicas=self.max_replicas_per_service, + timeout=self.current_timeout(num_retries - tries_left)) for service_root, ks in [(root, roots_map[root]) for root in sorted_roots]: if ks.finished(): continue - t = KeepClient.KeepWriterThread( - ks, - data=data, - data_hash=data_hash, - service_root=service_root, - thread_limiter=thread_limiter, - timeout=self.current_timeout(num_retries-tries_left), - thread_sequence=len(threads)) - t.start() - threads.append(t) - for t in threads: - t.join() - done += thread_limiter.done() - loop.save_result((done >= copies, len(threads))) + writer_pool.add_task(ks, service_root) + writer_pool.join() + done += writer_pool.done() + loop.save_result((done >= copies, writer_pool.total_task_nr)) if loop.success(): - return thread_limiter.response() + return writer_pool.response() if not roots_map: raise arvados.errors.KeepWriteError( "failed to write {}: no Keep services available ({})".format( @@ -1203,7 +1065,7 @@ class KeepClient(object): if roots_map[key].last_result()['error']) raise arvados.errors.KeepWriteError( "failed to write {} (wanted {} copies but wrote {})".format( - data_hash, copies, thread_limiter.done()), service_errors, label="service") + data_hash, copies, writer_pool.done()), service_errors, label="service") def local_store_put(self, data, copies=1, num_retries=None): """A stub for put(). diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index 314b93d5c3..3befc098ed 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -1065,7 +1065,7 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase): self.check_exception(copies=2, num_retries=3) -class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.ApiClientMock): +class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock): class FakeKeepService(object): @@ -1075,6 +1075,7 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A self._result = {} self._result['headers'] = {} self._result['headers']['x-keep-replicas-stored'] = str(replicas) + self._result['body'] = 'foobar' def put(self, data_hash, data, timeout): time.sleep(self.delay) @@ -1082,6 +1083,9 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A def last_result(self): return self._result + + def finished(self): + return False def test_only_write_enough_on_success(self): @@ -1089,12 +1093,12 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A pool = arvados.KeepClient.KeepWriterThreadPool( data = 'foo', data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3', + max_service_replicas = copies, copies = copies ) for i in range(10): ks = self.FakeKeepService(delay=i/10.0, will_succeed=True) pool.add_task(ks, None) - pool.start() pool.join() self.assertEqual(pool.done(), copies) @@ -1103,6 +1107,7 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A pool = arvados.KeepClient.KeepWriterThreadPool( data = 'foo', data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3', + max_service_replicas = copies, copies = copies ) for i in range(5): @@ -1110,72 +1115,10 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A pool.add_task(ks, None) ks = self.FakeKeepService(delay=i/10.0, will_succeed=True) pool.add_task(ks, None) - pool.start() pool.join() self.assertEqual(pool.done(), copies) -class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock): - - - class KeepFakeWriterThread(threading.Thread): - """ - Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter. - """ - def __init__(self, delay, will_succeed, thread_limiter): - super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__() - self.delay = delay # in seconds - self.success = will_succeed - self.limiter = thread_limiter - - def run(self): - with self.limiter: - if not self.limiter.shall_i_proceed(): - return - time.sleep(self.delay) - if self.success: - self.limiter.save_response('foo', 1) - else: - self.limiter.save_response(None, 0) - - def test_only_write_enough_on_success(self): - copies = 3 - threads = [] - limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1) - # Setting up fake writer threads with different delays so that the bug is revealed - for i in range(copies*2): - t = self.KeepFakeWriterThread( - delay=i/10.0, - will_succeed=True, - thread_limiter=limiter) - t.start() - threads.append(t) - for t in threads: - t.join() - self.assertEqual(limiter.done(), copies) - - def test_only_write_enough_on_partial_failure(self): - copies = 3 - threads = [] - limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1) - for i in range(copies): - t = self.KeepFakeWriterThread( - delay=i/10.0, - will_succeed=False, - thread_limiter=limiter) - t.start() - threads.append(t) - t = self.KeepFakeWriterThread( - delay=i/10.0, - will_succeed=True, - thread_limiter=limiter) - t.start() - threads.append(t) - for t in threads: - t.join() - self.assertEqual(limiter.done(), copies) - - @tutil.skip_sleep class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock): # Test put()s that need two distinct servers to succeed, possibly -- 2.30.2