X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/517d3fca54225873d36f94083f3b7056ce271f46..4335bce9369b5df234c5d68c4deff820ca2c28d1:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 22bf327e79..37b1c17902 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -316,7 +316,8 @@ class KeepClient(object): resp_md5 = hashlib.md5(content).hexdigest() if resp_md5 == locator.md5sum: return content - _logger.warning("Checksum fail: md5(%s) = %s", url, md5) + _logger.warning("Checksum fail: md5(%s) = %s", + url, resp_md5) return None def put(self, http, hash_s, body): @@ -391,7 +392,8 @@ class KeepClient(object): def __init__(self, api_client=None, proxy=None, timeout=300, - api_token=None, local_store=None, block_cache=None): + api_token=None, local_store=None, block_cache=None, + num_retries=0): """Initialize a new KeepClient. Arguments: @@ -415,6 +417,9 @@ class KeepClient(object): environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing. + * num_retries: The default number of times to retry failed requests. + This will be used as the default num_retries value when get() and + put() are called. Default 0. """ self.lock = threading.Lock() if proxy is None: @@ -435,7 +440,7 @@ class KeepClient(object): self.put = self.local_store_put else: self.timeout = timeout - + self.num_retries = num_retries if proxy: if not proxy.endswith('/'): proxy += '/' @@ -468,14 +473,14 @@ class KeepClient(object): if not keep_services: raise arvados.errors.NoKeepServersError() - self.using_proxy = (keep_services[0].get('service_type') == - 'proxy') + self.using_proxy = any(ks.get('service_type') == 'proxy' + for ks in keep_services) - roots = (("http%s://%s:%d/" % - ('s' if f['service_ssl_flag'] else '', - f['service_host'], - f['service_port'])) - for f in keep_services) + roots = ("{}://[{}]:{:d}/".format( + 'https' if ks['service_ssl_flag'] else 'http', + ks['service_host'], + ks['service_port']) + for ks in keep_services) self.service_roots = sorted(set(roots)) _logger.debug(str(self.service_roots)) @@ -557,7 +562,8 @@ class KeepClient(object): else: return None - def get(self, loc_s, num_retries=0): + @retry.retry_method + def get(self, loc_s, num_retries=None): """Get data from Keep. This method fetches one or more blocks of data from Keep. It @@ -574,7 +580,8 @@ class KeepClient(object): *each* Keep server if it returns temporary failures, with exponential backoff. Note that, in each loop, the method may try to fetch data from every available Keep service, along with any - that are named in location hints in the locator. Default 0. + that are named in location hints in the locator. The default value + is set when the KeepClient is initialized. """ if ',' in loc_s: return ''.join(self.get(x) for x in loc_s.split(',')) @@ -637,7 +644,8 @@ class KeepClient(object): else: raise arvados.errors.KeepReadError(loc_s) - def put(self, data, copies=2, num_retries=0): + @retry.retry_method + def put(self, data, copies=2, num_retries=None): """Save data in Keep. This method will get a list of Keep services from the API server, and @@ -652,7 +660,8 @@ class KeepClient(object): Default 2. * num_retries: The number of times to retry PUT requests to *each* Keep server if it returns temporary failures, with - exponential backoff. Default 0. + exponential backoff. The default value is set when the + KeepClient is initialized. """ data_hash = hashlib.md5(data).hexdigest() if copies < 1: @@ -698,7 +707,10 @@ class KeepClient(object): "Write fail for %s: wanted %d but wrote %d" % (data_hash, copies, thread_limiter.done())) - def local_store_put(self, data): + # Local storage methods need no-op num_retries arguments to keep + # integration tests happy. With better isolation they could + # probably be removed again. + def local_store_put(self, data, num_retries=0): md5 = hashlib.md5(data).hexdigest() locator = '%s+%d' % (md5, len(data)) with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f: @@ -707,7 +719,7 @@ class KeepClient(object): os.path.join(self.local_store, md5)) return locator - def local_store_get(self, loc_s): + def local_store_get(self, loc_s, num_retries=0): try: locator = KeepLocator(loc_s) except ValueError: