self.ready.set()
def size(self):
- if self.content == None:
+ if self.content is None:
return 0
else:
return len(self.content)
resp_md5 = hashlib.md5(content).hexdigest()
if resp_md5 == locator.md5sum:
return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
return None
def put(self, http, hash_s, body):
def __init__(self, api_client=None, proxy=None, timeout=300,
- api_token=None, local_store=None, block_cache=None):
+ api_token=None, local_store=None, block_cache=None,
+ num_retries=0):
"""Initialize a new KeepClient.
Arguments:
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
+ * num_retries: The default number of times to retry failed requests.
+ This will be used as the default num_retries value when get() and
+ put() are called. Default 0.
"""
self.lock = threading.Lock()
if proxy is None:
proxy = config.get('ARVADOS_KEEP_PROXY')
if api_token is None:
- api_token = config.get('ARVADOS_API_TOKEN')
+ if api_client is None:
+ api_token = config.get('ARVADOS_API_TOKEN')
+ else:
+ api_token = api_client.api_token
elif api_client is not None:
raise ValueError(
"can't build KeepClient with both API client and token")
self.put = self.local_store_put
else:
self.timeout = timeout
-
+ self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
proxy += '/'
if not keep_services:
raise arvados.errors.NoKeepServersError()
- self.using_proxy = (keep_services[0].get('service_type') ==
- 'proxy')
+ self.using_proxy = any(ks.get('service_type') == 'proxy'
+ for ks in keep_services)
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_services)
+ roots = ("{}://[{}]:{:d}/".format(
+ 'https' if ks['service_ssl_flag'] else 'http',
+ ks['service_host'],
+ ks['service_port'])
+ for ks in keep_services)
self.service_roots = sorted(set(roots))
_logger.debug(str(self.service_roots))
else:
return None
- def get(self, loc_s, num_retries=0):
+ @retry.retry_method
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
*each* Keep server if it returns temporary failures, with
exponential backoff. Note that, in each loop, the method may try
to fetch data from every available Keep service, along with any
- that are named in location hints in the locator. Default 0.
+ that are named in location hints in the locator. The default value
+ is set when the KeepClient is initialized.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
else:
raise arvados.errors.KeepReadError(loc_s)
- def put(self, data, copies=2, num_retries=0):
+ @retry.retry_method
+ def put(self, data, copies=2, num_retries=None):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
Default 2.
* num_retries: The number of times to retry PUT requests to
*each* Keep server if it returns temporary failures, with
- exponential backoff. Default 0.
+ exponential backoff. The default value is set when the
+ KeepClient is initialized.
"""
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, copies, thread_limiter.done()))
- def local_store_put(self, data):
+ # Local storage methods need no-op num_retries arguments to keep
+ # integration tests happy. With better isolation they could
+ # probably be removed again.
+ def local_store_put(self, data, num_retries=0):
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s):
+ def local_store_get(self, loc_s, num_retries=0):
try:
locator = KeepLocator(loc_s)
except ValueError: