X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/20ad8adb02808a85ac38f865fd870b00fb400b74..1e10339657c1dee2b71b4d10eddffd0a35c949b3:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 50b7e01939..8ed86fd79e 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -241,19 +241,34 @@ class KeepClient(object): Should be used in a "with" block. """ def __init__(self, todo): + self._started = 0 self._todo = todo self._done = 0 self._response = None + self._start_lock = threading.Condition() self._todo_lock = threading.Semaphore(todo) self._done_lock = threading.Lock() + self._local = threading.local() def __enter__(self): + self._start_lock.acquire() + if getattr(self._local, 'sequence', None) is not None: + # If the calling thread has used set_sequence(N), then + # we wait here until N other threads have started. + while self._started < self._local.sequence: + self._start_lock.wait() self._todo_lock.acquire() + self._started += 1 + self._start_lock.notifyAll() + self._start_lock.release() return self def __exit__(self, type, value, traceback): self._todo_lock.release() + def set_sequence(self, sequence): + self._local.sequence = sequence + def shall_i_proceed(self): """ Return true if the current thread should do stuff. Return @@ -517,7 +532,11 @@ class KeepClient(object): return self._success def run(self): - with self.args['thread_limiter'] as limiter: + limiter = self.args['thread_limiter'] + sequence = self.args['thread_sequence'] + if sequence is not None: + limiter.set_sequence(sequence) + with limiter: if not limiter.shall_i_proceed(): # My turn arrived, but the job has been done without # me. @@ -651,7 +670,7 @@ class KeepClient(object): self._writable_services = self._keep_services self.using_proxy = True self._static_services_list = True - self.max_replicas_per_service = None + self.max_replicas_per_service = 1 else: # It's important to avoid instantiating an API client # unless we actually need one, for testing's sake. @@ -664,7 +683,7 @@ class KeepClient(object): self._writable_services = None self.using_proxy = None self._static_services_list = False - self.max_replicas_per_service = None + self.max_replicas_per_service = 1 def current_timeout(self, attempt_number): """Return the appropriate timeout to use for this client. @@ -722,11 +741,10 @@ class KeepClient(object): self.using_proxy = any(ks.get('service_type') == 'proxy' for ks in self._keep_services) - # Set max_replicas_per_service to 1 for disk typed services. - # In case of, non-disk typed services, we will use None to mean unknown. - self.max_replicas_per_service = 1 + # For disk type services, max_replicas_per_service is 1 + # It is unknown or unlimited for non-disk typed services. for ks in accessible: - if ('disk' != ks.get('service_type')) and not ks.get('read_only'): + if ('disk' != ks.get('service_type')) and (not ks.get('read_only')): self.max_replicas_per_service = None def _service_weight(self, data_hash, service_uuid): @@ -951,9 +969,10 @@ class KeepClient(object): thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies) loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) + thread_sequence = 0 for tries_left in loop: try: - local_roots = self.map_new_services( + sorted_roots = self.map_new_services( roots_map, locator, force_rebuild=(tries_left < num_retries), need_writable=True, **headers) except Exception as error: @@ -961,7 +980,8 @@ class KeepClient(object): continue threads = [] - for service_root, ks in roots_map.iteritems(): + for service_root, ks in [(root, roots_map[root]) + for root in sorted_roots]: if ks.finished(): continue t = KeepClient.KeepWriterThread( @@ -970,9 +990,11 @@ class KeepClient(object): data_hash=data_hash, service_root=service_root, thread_limiter=thread_limiter, - timeout=self.current_timeout(num_retries-tries_left)) + timeout=self.current_timeout(num_retries-tries_left), + thread_sequence=thread_sequence) t.start() threads.append(t) + thread_sequence += 1 for t in threads: t.join() loop.save_result((thread_limiter.done() >= copies, len(threads))) @@ -985,7 +1007,7 @@ class KeepClient(object): data_hash, loop.last_result())) else: service_errors = ((key, roots_map[key].last_result()['error']) - for key in local_roots + for key in sorted_roots if roots_map[key].last_result()['error']) raise arvados.errors.KeepWriteError( "failed to write {} (wanted {} copies but wrote {})".format(