import arvados.retry as retry
import arvados.util
+try:
+ # Workaround for urllib3 bug.
+ # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
+ # However, urllib3 prior to version 1.10 has a major bug in this feature
+ # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
+ # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
+ # following workaround is necessary to be able to use
+ # the arvados python sdk with the distribution-provided packages.
+ import urllib3
+ from pkg_resources import parse_version
+ if parse_version(urllib3.__version__) < parse_version('1.10'):
+ from urllib3.contrib import pyopenssl
+ pyopenssl.extract_from_urllib3()
+except ImportError:
+ pass
+
_logger = logging.getLogger('arvados.keep')
global_client_object = None
if s is not None)
def stripped(self):
- return "%s+%i" % (self.md5sum, self.size)
+ if self.size is not None:
+ return "%s+%i" % (self.md5sum, self.size)
+ else:
+ return self.md5sum
def _make_hex_prop(name, length):
# Build and return a new property with the given name that
replicas_stored = int(result.headers['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(result.text.strip(), replicas_stored)
+ limiter.save_response(result.content.strip(), replicas_stored)
elif status is not None:
_logger.debug("Request fail: PUT %s => %s %s",
self.args['data_hash'], status,
- self.service.last_result.text)
+ self.service.last_result.content)
def __init__(self, api_client=None, proxy=None,
"""Initialize a new KeepClient.
Arguments:
- * api_client: The API client to use to find Keep services. If not
+ :api_client:
+ The API client to use to find Keep services. If not
provided, KeepClient will build one from available Arvados
configuration.
- * proxy: If specified, this KeepClient will send requests to this
- Keep proxy. Otherwise, KeepClient will fall back to the setting
- of the ARVADOS_KEEP_PROXY configuration setting. If you want to
- ensure KeepClient does not use a proxy, pass in an empty string.
- * timeout: The timeout (in seconds) for HTTP requests to Keep
+
+ :proxy:
+ If specified, this KeepClient will send requests to this Keep
+ proxy. Otherwise, KeepClient will fall back to the setting of the
+ ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
+ KeepClient does not use a proxy, pass in an empty string.
+
+ :timeout:
+ The initial timeout (in seconds) for HTTP requests to Keep
non-proxy servers. A tuple of two floats is interpreted as
(connection_timeout, read_timeout): see
http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+ Because timeouts are often a result of transient server load, the
+ actual connection timeout will be increased by a factor of two on
+ each retry.
Default: (2, 300).
- * proxy_timeout: The timeout (in seconds) for HTTP requests to
+
+ :proxy_timeout:
+ The initial timeout (in seconds) for HTTP requests to
Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). Default: (20, 300).
- * api_token: If you're not using an API client, but only talking
+ (connection_timeout, read_timeout). The behavior described
+ above for adjusting connection timeouts on retry also applies.
+ Default: (20, 300).
+
+ :api_token:
+ If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
api_client and api_token. If you specify neither, KeepClient
will use one available from the Arvados configuration.
- * local_store: If specified, this KeepClient will bypass Keep
+
+ :local_store:
+ If specified, this KeepClient will bypass Keep
services, and save data to the named directory. If unspecified,
KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
- * num_retries: The default number of times to retry failed requests.
+
+ :num_retries:
+ The default number of times to retry failed requests.
This will be used as the default num_retries value when get() and
put() are called. Default 0.
+
+ :session:
+ The requests.Session object to use for get() and put() requests.
+ Will create one if not specified.
"""
self.lock = threading.Lock()
if proxy is None:
self.using_proxy = None
self._static_services_list = False
- def current_timeout(self):
- """Return the appropriate timeout to use for this client: the proxy
- timeout setting if the backend service is currently a proxy,
- the regular timeout setting otherwise.
+ def current_timeout(self, attempt_number):
+ """Return the appropriate timeout to use for this client.
+
+ The proxy timeout setting if the backend service is currently a proxy,
+ the regular timeout setting otherwise. The `attempt_number` indicates
+ how many times the operation has been tried already (starting from 0
+ for the first try), and scales the connection timeout portion of the
+ return value accordingly.
+
"""
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
- return self.proxy_timeout if self.using_proxy else self.timeout
+ t = self.proxy_timeout if self.using_proxy else self.timeout
+ return (t[0] * (1 << attempt_number), t[1])
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
else:
return None
+ def get_from_cache(self, loc):
+ """Fetch a block only if is in the cache, otherwise return None."""
+ slot = self.block_cache.get(loc)
+ if slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
+
@retry.retry_method
- def get(self, loc_s, num_retries=None, cache_only=False):
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
to fetch data from every available Keep service, along with any
that are named in location hints in the locator. The default value
is set when the KeepClient is initialized.
- * cache_only: If true, return the block data only if already present in
- cache, otherwise return None.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
-
- if cache_only:
- slot = self.block_cache.get(expect_hash)
- if slot.ready.is_set():
- return slot.get()
- else:
- return None
-
slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
for root in (local_roots + hint_roots)
if roots_map[root].usable()]
for keep_service in services_to_try:
- blob = keep_service.get(locator, timeout=self.current_timeout())
+ blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
if blob is not None:
break
loop.save_result((blob, len(services_to_try)))
"{} not found".format(loc_s), service_errors)
else:
raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s), service_errors)
+ "failed to read {}".format(loc_s), service_errors, label="service")
@retry.retry_method
def put(self, data, copies=2, num_retries=None):
exponential backoff. The default value is set when the
KeepClient is initialized.
"""
+
+ if isinstance(data, unicode):
+ data = data.encode("ascii")
+ elif not isinstance(data, str):
+ raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
return data_hash
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.current_timeout())
+ timeout=self.current_timeout(num_retries-tries_left))
t.start()
threads.append(t)
for t in threads:
if not roots_map[key].success_flag)
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
- data_hash, copies, thread_limiter.done()), service_errors)
+ data_hash, copies, thread_limiter.done()), service_errors, label="service")
+
+ def local_store_put(self, data, copies=1, num_retries=None):
+ """A stub for put().
- # Local storage methods need no-op num_retries arguments to keep
- # integration tests happy. With better isolation they could
- # probably be removed again.
- def local_store_put(self, data, num_retries=0):
+ This method is used in place of the real put() method when
+ using local storage (see constructor's local_store argument).
+
+ copies and num_retries arguments are ignored: they are here
+ only for the sake of offering the same call signature as
+ put().
+
+ Data stored this way can be retrieved via local_store_get().
+ """
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s, num_retries=0):
+ def local_store_get(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
try:
locator = KeepLocator(loc_s)
except ValueError: