X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/facbf48de90f600a42363b72bcb2b4929f78c73d..63b03a39adfd78961c5bbb6a3a2d02ccd8c92e4d:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index d1b07cce0d..8d0a89d777 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -404,11 +404,11 @@ class KeepClient(object): replicas_stored = int(result.headers['x-keep-replicas-stored']) except (KeyError, ValueError): replicas_stored = 1 - limiter.save_response(result.text.strip(), replicas_stored) + limiter.save_response(result.content.strip(), replicas_stored) elif status is not None: _logger.debug("Request fail: PUT %s => %s %s", self.args['data_hash'], status, - self.service.last_result.text) + self.service.last_result.content) def __init__(self, api_client=None, proxy=None, @@ -418,35 +418,52 @@ class KeepClient(object): """Initialize a new KeepClient. Arguments: - * api_client: The API client to use to find Keep services. If not + :api_client: + The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration. - * proxy: If specified, this KeepClient will send requests to this - Keep proxy. Otherwise, KeepClient will fall back to the setting - of the ARVADOS_KEEP_PROXY configuration setting. If you want to - ensure KeepClient does not use a proxy, pass in an empty string. - * timeout: The timeout (in seconds) for HTTP requests to Keep + + :proxy: + If specified, this KeepClient will send requests to this Keep + proxy. Otherwise, KeepClient will fall back to the setting of the + ARVADOS_KEEP_PROXY configuration setting. If you want to ensure + KeepClient does not use a proxy, pass in an empty string. + + :timeout: + The timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of two floats is interpreted as (connection_timeout, read_timeout): see http://docs.python-requests.org/en/latest/user/advanced/#timeouts. Default: (2, 300). - * proxy_timeout: The timeout (in seconds) for HTTP requests to + + :proxy_timeout: + The timeout (in seconds) for HTTP requests to Keep proxies. A tuple of two floats is interpreted as (connection_timeout, read_timeout). Default: (20, 300). - * api_token: If you're not using an API client, but only talking + + :api_token: + If you're not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration. - * local_store: If specified, this KeepClient will bypass Keep + + :local_store: + If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing. - * num_retries: The default number of times to retry failed requests. + + :num_retries: + The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 0. + + :session: + The requests.Session object to use for get() and put() requests. + Will create one if not specified. """ self.lock = threading.Lock() if proxy is None: @@ -585,8 +602,16 @@ class KeepClient(object): else: return None + def get_from_cache(self, loc): + """Fetch a block only if is in the cache, otherwise return None.""" + slot = self.block_cache.get(loc) + if slot.ready.is_set(): + return slot.get() + else: + return None + @retry.retry_method - def get(self, loc_s, num_retries=None, cache_only=False): + def get(self, loc_s, num_retries=None): """Get data from Keep. This method fetches one or more blocks of data from Keep. It @@ -605,21 +630,11 @@ class KeepClient(object): to fetch data from every available Keep service, along with any that are named in location hints in the locator. The default value is set when the KeepClient is initialized. - * cache_only: If true, return the block data only if already present in - cache, otherwise return None. """ if ',' in loc_s: return ''.join(self.get(x) for x in loc_s.split(',')) locator = KeepLocator(loc_s) expect_hash = locator.md5sum - - if cache_only: - slot = self.block_cache.get(expect_hash) - if slot.ready.is_set(): - return slot.get() - else: - return None - slot, first = self.block_cache.reserve_cache(expect_hash) if not first: v = slot.get() @@ -683,7 +698,7 @@ class KeepClient(object): "{} not found".format(loc_s), service_errors) else: raise arvados.errors.KeepReadError( - "failed to read {}".format(loc_s), service_errors) + "failed to read {}".format(loc_s), service_errors, label="service") @retry.retry_method def put(self, data, copies=2, num_retries=None): @@ -704,6 +719,12 @@ class KeepClient(object): exponential backoff. The default value is set when the KeepClient is initialized. """ + + if isinstance(data, unicode): + data = data.encode("ascii") + elif not isinstance(data, str): + raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'") + data_hash = hashlib.md5(data).hexdigest() if copies < 1: return data_hash @@ -754,12 +775,20 @@ class KeepClient(object): if not roots_map[key].success_flag) raise arvados.errors.KeepWriteError( "failed to write {} (wanted {} copies but wrote {})".format( - data_hash, copies, thread_limiter.done()), service_errors) + data_hash, copies, thread_limiter.done()), service_errors, label="service") + + def local_store_put(self, data, copies=1, num_retries=None): + """A stub for put(). + + This method is used in place of the real put() method when + using local storage (see constructor's local_store argument). - # Local storage methods need no-op num_retries arguments to keep - # integration tests happy. With better isolation they could - # probably be removed again. - def local_store_put(self, data, num_retries=0): + copies and num_retries arguments are ignored: they are here + only for the sake of offering the same call signature as + put(). + + Data stored this way can be retrieved via local_store_get(). + """ md5 = hashlib.md5(data).hexdigest() locator = '%s+%d' % (md5, len(data)) with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f: @@ -768,7 +797,8 @@ class KeepClient(object): os.path.join(self.local_store, md5)) return locator - def local_store_get(self, loc_s, num_retries=0): + def local_store_get(self, loc_s, num_retries=None): + """Companion to local_store_put().""" try: locator = KeepLocator(loc_s) except ValueError: