replicas_stored = int(result.headers['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(result.text.strip(), replicas_stored)
+ limiter.save_response(result.content.strip(), replicas_stored)
elif status is not None:
_logger.debug("Request fail: PUT %s => %s %s",
self.args['data_hash'], status,
- self.service.last_result.text)
+ self.service.last_result.content)
def __init__(self, api_client=None, proxy=None,
"""Initialize a new KeepClient.
Arguments:
- * api_client: The API client to use to find Keep services. If not
+ :api_client:
+ The API client to use to find Keep services. If not
provided, KeepClient will build one from available Arvados
configuration.
- * proxy: If specified, this KeepClient will send requests to this
- Keep proxy. Otherwise, KeepClient will fall back to the setting
- of the ARVADOS_KEEP_PROXY configuration setting. If you want to
- ensure KeepClient does not use a proxy, pass in an empty string.
- * timeout: The timeout (in seconds) for HTTP requests to Keep
+
+ :proxy:
+ If specified, this KeepClient will send requests to this Keep
+ proxy. Otherwise, KeepClient will fall back to the setting of the
+ ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
+ KeepClient does not use a proxy, pass in an empty string.
+
+ :timeout:
+ The timeout (in seconds) for HTTP requests to Keep
non-proxy servers. A tuple of two floats is interpreted as
(connection_timeout, read_timeout): see
http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
Default: (2, 300).
- * proxy_timeout: The timeout (in seconds) for HTTP requests to
+
+ :proxy_timeout:
+ The timeout (in seconds) for HTTP requests to
Keep proxies. A tuple of two floats is interpreted as
(connection_timeout, read_timeout). Default: (20, 300).
- * api_token: If you're not using an API client, but only talking
+
+ :api_token:
+ If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
api_client and api_token. If you specify neither, KeepClient
will use one available from the Arvados configuration.
- * local_store: If specified, this KeepClient will bypass Keep
+
+ :local_store:
+ If specified, this KeepClient will bypass Keep
services, and save data to the named directory. If unspecified,
KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
- * num_retries: The default number of times to retry failed requests.
+
+ :num_retries:
+ The default number of times to retry failed requests.
This will be used as the default num_retries value when get() and
put() are called. Default 0.
+
+ :session:
+ The requests.Session object to use for get() and put() requests.
+ Will create one if not specified.
"""
self.lock = threading.Lock()
if proxy is None:
else:
return None
+ def get_from_cache(self, loc):
+ """Fetch a block only if is in the cache, otherwise return None."""
+ slot = self.block_cache.get(loc)
+ if slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
+
@retry.retry_method
- def get(self, loc_s, num_retries=None, cache_only=False):
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
to fetch data from every available Keep service, along with any
that are named in location hints in the locator. The default value
is set when the KeepClient is initialized.
- * cache_only: If true, return the block data only if already present in
- cache, otherwise return None.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
-
- if cache_only:
- slot = self.block_cache.get(expect_hash)
- if slot.ready.is_set():
- return slot.get()
- else:
- return None
-
slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
"{} not found".format(loc_s), service_errors)
else:
raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s), service_errors)
+ "failed to read {}".format(loc_s), service_errors, label="service")
@retry.retry_method
def put(self, data, copies=2, num_retries=None):
exponential backoff. The default value is set when the
KeepClient is initialized.
"""
+
+ if isinstance(data, unicode):
+ data = data.encode("ascii")
+ elif not isinstance(data, str):
+ raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
return data_hash
if not roots_map[key].success_flag)
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
- data_hash, copies, thread_limiter.done()), service_errors)
+ data_hash, copies, thread_limiter.done()), service_errors, label="service")
+
+ def local_store_put(self, data, copies=1, num_retries=None):
+ """A stub for put().
+
+ This method is used in place of the real put() method when
+ using local storage (see constructor's local_store argument).
- # Local storage methods need no-op num_retries arguments to keep
- # integration tests happy. With better isolation they could
- # probably be removed again.
- def local_store_put(self, data, num_retries=0):
+ copies and num_retries arguments are ignored: they are here
+ only for the sake of offering the same call signature as
+ put().
+
+ Data stored this way can be retrieved via local_store_get().
+ """
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s, num_retries=0):
+ def local_store_get(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
try:
locator = KeepLocator(loc_s)
except ValueError: