X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e158f485053be1e840073b321033d60d686a55a8..ae41687970e380effecc588301069b604e0ba041:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 23d8c20db5..71dc7ce7af 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -394,11 +394,11 @@ class KeepClient(object): replicas_stored = int(result.headers['x-keep-replicas-stored']) except (KeyError, ValueError): replicas_stored = 1 - limiter.save_response(result.text.strip(), replicas_stored) + limiter.save_response(result.content.strip(), replicas_stored) elif status is not None: _logger.debug("Request fail: PUT %s => %s %s", self.args['data_hash'], status, - self.service.last_result.text) + self.service.last_result.content) def __init__(self, api_client=None, proxy=None, @@ -517,16 +517,16 @@ class KeepClient(object): r['service_port']) _logger.debug(str(self._keep_services)) - def _service_weight(self, hash, service_uuid): + def _service_weight(self, data_hash, service_uuid): """Compute the weight of a Keep service endpoint for a data block with a known hash. The weight is md5(h + u) where u is the last 15 characters of the service endpoint's UUID. """ - return hashlib.md5(hash + service_uuid[-15:]).hexdigest() + return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest() - def weighted_service_roots(self, hash, force_rebuild=False): + def weighted_service_roots(self, data_hash, force_rebuild=False): """Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash. @@ -534,14 +534,14 @@ class KeepClient(object): self.build_services_list(force_rebuild) # Sort the available services by weight (heaviest first) for - # this hash, and return their service_roots (base URIs) in - # that order. + # this data_hash, and return their service_roots (base URIs) + # in that order. sorted_roots = [ svc['_service_root'] for svc in sorted( self._keep_services, reverse=True, - key=lambda svc: self._service_weight(hash, svc['uuid']))] - _logger.debug(hash + ': ' + str(sorted_roots)) + key=lambda svc: self._service_weight(data_hash, svc['uuid']))] + _logger.debug(data_hash + ': ' + str(sorted_roots)) return sorted_roots def map_new_services(self, roots_map, md5_s, force_rebuild, **headers): @@ -643,17 +643,27 @@ class KeepClient(object): if loop.success(): return blob - # No servers fulfilled the request. Count how many responded - # "not found;" if the ratio is high enough (currently 75%), report - # Not Found; otherwise a generic error. + try: + all_roots = local_roots + hint_roots + except NameError: + # We never successfully fetched local_roots. + all_roots = hint_roots # Q: Including 403 is necessary for the Keep tests to continue # passing, but maybe they should expect KeepReadError instead? - not_founds = sum(1 for ks in roots_map.values() - if ks.last_status() in set([403, 404, 410])) - if roots_map and ((float(not_founds) / len(roots_map)) >= .75): - raise arvados.errors.NotFoundError(loc_s) + not_founds = sum(1 for key in all_roots + if roots_map[key].last_status() in {403, 404, 410}) + service_errors = ((key, roots_map[key].last_result) + for key in all_roots) + if not roots_map: + raise arvados.errors.KeepReadError( + "failed to read {}: no Keep services available ({})".format( + loc_s, loop.last_result())) + elif not_founds == len(all_roots): + raise arvados.errors.NotFoundError( + "{} not found".format(loc_s), service_errors) else: - raise arvados.errors.KeepReadError(loc_s) + raise arvados.errors.KeepReadError( + "failed to read {}".format(loc_s), service_errors) @retry.retry_method def put(self, data, copies=2, num_retries=None): @@ -674,6 +684,12 @@ class KeepClient(object): exponential backoff. The default value is set when the KeepClient is initialized. """ + + if isinstance(data, unicode): + data = data.encode("ascii") + elif not isinstance(data, str): + raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'") + data_hash = hashlib.md5(data).hexdigest() if copies < 1: return data_hash @@ -714,14 +730,30 @@ class KeepClient(object): if loop.success(): return thread_limiter.response() - raise arvados.errors.KeepWriteError( - "Write fail for %s: wanted %d but wrote %d" % - (data_hash, copies, thread_limiter.done())) - - # Local storage methods need no-op num_retries arguments to keep - # integration tests happy. With better isolation they could - # probably be removed again. - def local_store_put(self, data, num_retries=0): + if not roots_map: + raise arvados.errors.KeepWriteError( + "failed to write {}: no Keep services available ({})".format( + data_hash, loop.last_result())) + else: + service_errors = ((key, roots_map[key].last_result) + for key in local_roots + if not roots_map[key].success_flag) + raise arvados.errors.KeepWriteError( + "failed to write {} (wanted {} copies but wrote {})".format( + data_hash, copies, thread_limiter.done()), service_errors) + + def local_store_put(self, data, copies=1, num_retries=None): + """A stub for put(). + + This method is used in place of the real put() method when + using local storage (see constructor's local_store argument). + + copies and num_retries arguments are ignored: they are here + only for the sake of offering the same call signature as + put(). + + Data stored this way can be retrieved via local_store_get(). + """ md5 = hashlib.md5(data).hexdigest() locator = '%s+%d' % (md5, len(data)) with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f: @@ -730,7 +762,8 @@ class KeepClient(object): os.path.join(self.local_store, md5)) return locator - def local_store_get(self, loc_s, num_retries=0): + def local_store_get(self, loc_s, num_retries=None): + """Companion to local_store_put().""" try: locator = KeepLocator(loc_s) except ValueError: