X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fc16c2ebbb92d9b0e52b6435b717f26b9680c7e6..4df91a01ad793744d7370b1ad5837543ed76c242:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 48d115c7a4..8ed86fd79e 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -76,7 +76,7 @@ class KeepLocator(object): return getattr(self, data_name) def setter(self, hex_str): if not arvados.util.is_hex(hex_str, length): - raise ValueError("{} must be a {}-digit hex string: {}". + raise ValueError("{} is not a {}-digit hex string: {}". format(name, length, hex_str)) setattr(self, data_name, hex_str) return property(getter, setter) @@ -241,19 +241,34 @@ class KeepClient(object): Should be used in a "with" block. """ def __init__(self, todo): + self._started = 0 self._todo = todo self._done = 0 self._response = None + self._start_lock = threading.Condition() self._todo_lock = threading.Semaphore(todo) self._done_lock = threading.Lock() + self._local = threading.local() def __enter__(self): + self._start_lock.acquire() + if getattr(self._local, 'sequence', None) is not None: + # If the calling thread has used set_sequence(N), then + # we wait here until N other threads have started. + while self._started < self._local.sequence: + self._start_lock.wait() self._todo_lock.acquire() + self._started += 1 + self._start_lock.notifyAll() + self._start_lock.release() return self def __exit__(self, type, value, traceback): self._todo_lock.release() + def set_sequence(self, sequence): + self._local.sequence = sequence + def shall_i_proceed(self): """ Return true if the current thread should do stuff. Return @@ -517,7 +532,11 @@ class KeepClient(object): return self._success def run(self): - with self.args['thread_limiter'] as limiter: + limiter = self.args['thread_limiter'] + sequence = self.args['thread_sequence'] + if sequence is not None: + limiter.set_sequence(sequence) + with limiter: if not limiter.shall_i_proceed(): # My turn arrived, but the job has been done without # me. @@ -648,8 +667,10 @@ class KeepClient(object): 'uuid': 'proxy', '_service_root': proxy, }] + self._writable_services = self._keep_services self.using_proxy = True self._static_services_list = True + self.max_replicas_per_service = 1 else: # It's important to avoid instantiating an API client # unless we actually need one, for testing's sake. @@ -659,8 +680,10 @@ class KeepClient(object): self.api_token = api_client.api_token self._gateway_services = {} self._keep_services = None + self._writable_services = None self.using_proxy = None self._static_services_list = False + self.max_replicas_per_service = 1 def current_timeout(self, attempt_number): """Return the appropriate timeout to use for this client. @@ -711,10 +734,18 @@ class KeepClient(object): self._keep_services = [ ks for ks in accessible if ks.get('service_type') in ['disk', 'proxy']] + self._writable_services = [ + ks for ks in accessible + if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))] _logger.debug(str(self._keep_services)) self.using_proxy = any(ks.get('service_type') == 'proxy' for ks in self._keep_services) + # For disk type services, max_replicas_per_service is 1 + # It is unknown or unlimited for non-disk typed services. + for ks in accessible: + if ('disk' != ks.get('service_type')) and (not ks.get('read_only')): + self.max_replicas_per_service = None def _service_weight(self, data_hash, service_uuid): """Compute the weight of a Keep service endpoint for a data @@ -725,7 +756,7 @@ class KeepClient(object): """ return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest() - def weighted_service_roots(self, locator, force_rebuild=False): + def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): """Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints. @@ -733,7 +764,6 @@ class KeepClient(object): self.build_services_list(force_rebuild) sorted_roots = [] - # Use the services indicated by the given +K@... remote # service hints, if any are present and can be resolved to a # URI. @@ -750,21 +780,24 @@ class KeepClient(object): # Sort the available local services by weight (heaviest first) # for this locator, and return their service_roots (base URIs) # in that order. + use_services = self._keep_services + if need_writable: + use_services = self._writable_services sorted_roots.extend([ svc['_service_root'] for svc in sorted( - self._keep_services, + use_services, reverse=True, key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) _logger.debug("{}: {}".format(locator, sorted_roots)) return sorted_roots - def map_new_services(self, roots_map, locator, force_rebuild, **headers): + def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers): # roots_map is a dictionary, mapping Keep service root strings # to KeepService objects. Poll for Keep services, and add any # new ones to roots_map. Return the current list of local # root strings. headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) - local_roots = self.weighted_service_roots(locator, force_rebuild) + local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) for root in local_roots: if root not in roots_map: roots_map[root] = self.KeepService( @@ -858,7 +891,8 @@ class KeepClient(object): try: sorted_roots = self.map_new_services( roots_map, locator, - force_rebuild=(tries_left < num_retries)) + force_rebuild=(tries_left < num_retries), + need_writable=False) except Exception as error: loop.save_result(error) continue @@ -920,32 +954,34 @@ class KeepClient(object): if isinstance(data, unicode): data = data.encode("ascii") elif not isinstance(data, str): - raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'") + raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'") data_hash = hashlib.md5(data).hexdigest() + loc_s = data_hash + '+' + str(len(data)) if copies < 1: - return data_hash - locator = KeepLocator(data_hash + '+' + str(len(data))) + return loc_s + locator = KeepLocator(loc_s) headers = {} - if self.using_proxy: - # Tell the proxy how many copies we want it to store - headers['X-Keep-Desired-Replication'] = str(copies) + # Tell the proxy how many copies we want it to store + headers['X-Keep-Desired-Replication'] = str(copies) roots_map = {} - thread_limiter = KeepClient.ThreadLimiter(copies) + thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies) loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) + thread_sequence = 0 for tries_left in loop: try: - local_roots = self.map_new_services( + sorted_roots = self.map_new_services( roots_map, locator, - force_rebuild=(tries_left < num_retries), **headers) + force_rebuild=(tries_left < num_retries), need_writable=True, **headers) except Exception as error: loop.save_result(error) continue threads = [] - for service_root, ks in roots_map.iteritems(): + for service_root, ks in [(root, roots_map[root]) + for root in sorted_roots]: if ks.finished(): continue t = KeepClient.KeepWriterThread( @@ -954,9 +990,11 @@ class KeepClient(object): data_hash=data_hash, service_root=service_root, thread_limiter=thread_limiter, - timeout=self.current_timeout(num_retries-tries_left)) + timeout=self.current_timeout(num_retries-tries_left), + thread_sequence=thread_sequence) t.start() threads.append(t) + thread_sequence += 1 for t in threads: t.join() loop.save_result((thread_limiter.done() >= copies, len(threads))) @@ -969,7 +1007,7 @@ class KeepClient(object): data_hash, loop.last_result())) else: service_errors = ((key, roots_map[key].last_result()['error']) - for key in local_roots + for key in sorted_roots if roots_map[key].last_result()['error']) raise arvados.errors.KeepWriteError( "failed to write {} (wanted {} copies but wrote {})".format(