X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7563fb986662a066f0aa3a9c4c1dd35159fb69cc..c3f49b76d173818386f5c65db46b353e1d334d1e:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index b3d64a4198..ec9f6f6422 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -2,6 +2,7 @@ import cStringIO import datetime import hashlib import logging +import math import os import pycurl import Queue @@ -219,21 +220,27 @@ class KeepClient(object): DEFAULT_PROXY_TIMEOUT = (20, 300) class ThreadLimiter(object): - """ - Limit the number of threads running at a given time to - {desired successes} minus {successes reported}. When successes - reported == desired, wake up the remaining threads and tell - them to quit. + """Limit the number of threads writing to Keep at once. + + This ensures that only a number of writer threads that could + potentially achieve the desired replication level run at once. + Once the desired replication level is achieved, queued threads + are instructed not to run. Should be used in a "with" block. """ - def __init__(self, todo): + def __init__(self, want_copies, max_service_replicas): self._started = 0 - self._todo = todo + self._want_copies = want_copies self._done = 0 self._response = None self._start_lock = threading.Condition() - self._todo_lock = threading.Semaphore(todo) + if (not max_service_replicas) or (max_service_replicas >= want_copies): + max_threads = 1 + else: + max_threads = math.ceil(float(want_copies) / max_service_replicas) + _logger.debug("Limiter max threads is %d", max_threads) + self._todo_lock = threading.Semaphore(max_threads) self._done_lock = threading.Lock() self._local = threading.local() @@ -258,34 +265,28 @@ class KeepClient(object): def shall_i_proceed(self): """ - Return true if the current thread should do stuff. Return - false if the current thread should just stop. + Return true if the current thread should write to Keep. + Return false otherwise. """ with self._done_lock: - return (self._done < self._todo) + return (self._done < self._want_copies) def save_response(self, response_body, replicas_stored): """ Records a response body (a locator, possibly signed) returned by - the Keep server. It is not necessary to save more than - one response, since we presume that any locator returned - in response to a successful request is valid. + the Keep server, and the number of replicas it stored. """ with self._done_lock: self._done += replicas_stored self._response = response_body def response(self): - """ - Returns the body from the response to a PUT request. - """ + """Return the body from the response to a PUT request.""" with self._done_lock: return self._response def done(self): - """ - Return how many successes were reported. - """ + """Return the total number of replicas successfully stored.""" with self._done_lock: return self._done @@ -645,6 +646,7 @@ class KeepClient(object): self.put = self.local_store_put else: self.num_retries = num_retries + self.max_replicas_per_service = None if proxy: if not proxy.endswith('/'): proxy += '/' @@ -658,7 +660,6 @@ class KeepClient(object): self._writable_services = self._keep_services self.using_proxy = True self._static_services_list = True - self.max_replicas_per_service = None else: # It's important to avoid instantiating an API client # unless we actually need one, for testing's sake. @@ -671,7 +672,6 @@ class KeepClient(object): self._writable_services = None self.using_proxy = None self._static_services_list = False - self.max_replicas_per_service = 1 def current_timeout(self, attempt_number): """Return the appropriate timeout to use for this client. @@ -955,10 +955,8 @@ class KeepClient(object): # Tell the proxy how many copies we want it to store headers['X-Keep-Desired-Replication'] = str(copies) roots_map = {} - thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies) loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) - thread_sequence = 0 for tries_left in loop: try: sorted_roots = self.map_new_services( @@ -968,6 +966,8 @@ class KeepClient(object): loop.save_result(error) continue + thread_limiter = KeepClient.ThreadLimiter( + copies, self.max_replicas_per_service) threads = [] for service_root, ks in [(root, roots_map[root]) for root in sorted_roots]: @@ -980,10 +980,9 @@ class KeepClient(object): service_root=service_root, thread_limiter=thread_limiter, timeout=self.current_timeout(num_retries-tries_left), - thread_sequence=thread_sequence) + thread_sequence=len(threads)) t.start() threads.append(t) - thread_sequence += 1 for t in threads: t.join() loop.save_result((thread_limiter.done() >= copies, len(threads)))