X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0d63ac0c2486a43198eb1015ba8d1028239139ee..fd0074f2200bc41bc63be770fffbe2446fb0cc03:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index f59ec710c4..6196b50202 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -27,6 +27,22 @@ import arvados.errors import arvados.retry as retry import arvados.util +try: + # Workaround for urllib3 bug. + # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl. + # However, urllib3 prior to version 1.10 has a major bug in this feature + # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412) + # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the + # following workaround is necessary to be able to use + # the arvados python sdk with the distribution-provided packages. + import urllib3 + from pkg_resources import parse_version + if parse_version(urllib3.__version__) < parse_version('1.10'): + from urllib3.contrib import pyopenssl + pyopenssl.extract_from_urllib3() +except ImportError: + pass + _logger = logging.getLogger('arvados.keep') global_client_object = None @@ -59,7 +75,10 @@ class KeepLocator(object): if s is not None) def stripped(self): - return "%s+%i" % (self.md5sum, self.size) + if self.size is not None: + return "%s+%i" % (self.md5sum, self.size) + else: + return self.md5sum def _make_hex_prop(name, length): # Build and return a new property with the given name that @@ -404,11 +423,11 @@ class KeepClient(object): replicas_stored = int(result.headers['x-keep-replicas-stored']) except (KeyError, ValueError): replicas_stored = 1 - limiter.save_response(result.text.strip(), replicas_stored) + limiter.save_response(result.content.strip(), replicas_stored) elif status is not None: _logger.debug("Request fail: PUT %s => %s %s", self.args['data_hash'], status, - self.service.last_result.text) + self.service.last_result.content) def __init__(self, api_client=None, proxy=None, @@ -418,35 +437,57 @@ class KeepClient(object): """Initialize a new KeepClient. Arguments: - * api_client: The API client to use to find Keep services. If not + :api_client: + The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration. - * proxy: If specified, this KeepClient will send requests to this - Keep proxy. Otherwise, KeepClient will fall back to the setting - of the ARVADOS_KEEP_PROXY configuration setting. If you want to - ensure KeepClient does not use a proxy, pass in an empty string. - * timeout: The timeout (in seconds) for HTTP requests to Keep + + :proxy: + If specified, this KeepClient will send requests to this Keep + proxy. Otherwise, KeepClient will fall back to the setting of the + ARVADOS_KEEP_PROXY configuration setting. If you want to ensure + KeepClient does not use a proxy, pass in an empty string. + + :timeout: + The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of two floats is interpreted as (connection_timeout, read_timeout): see http://docs.python-requests.org/en/latest/user/advanced/#timeouts. + Because timeouts are often a result of transient server load, the + actual connection timeout will be increased by a factor of two on + each retry. Default: (2, 300). - * proxy_timeout: The timeout (in seconds) for HTTP requests to + + :proxy_timeout: + The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of two floats is interpreted as - (connection_timeout, read_timeout). Default: (20, 300). - * api_token: If you're not using an API client, but only talking + (connection_timeout, read_timeout). The behavior described + above for adjusting connection timeouts on retry also applies. + Default: (20, 300). + + :api_token: + If you're not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration. - * local_store: If specified, this KeepClient will bypass Keep + + :local_store: + If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing. - * num_retries: The default number of times to retry failed requests. + + :num_retries: + The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 0. + + :session: + The requests.Session object to use for get() and put() requests. + Will create one if not specified. """ self.lock = threading.Lock() if proxy is None: @@ -494,14 +535,20 @@ class KeepClient(object): self.using_proxy = None self._static_services_list = False - def current_timeout(self): - """Return the appropriate timeout to use for this client: the proxy - timeout setting if the backend service is currently a proxy, - the regular timeout setting otherwise. + def current_timeout(self, attempt_number): + """Return the appropriate timeout to use for this client. + + The proxy timeout setting if the backend service is currently a proxy, + the regular timeout setting otherwise. The `attempt_number` indicates + how many times the operation has been tried already (starting from 0 + for the first try), and scales the connection timeout portion of the + return value accordingly. + """ # TODO(twp): the timeout should be a property of a # KeepService, not a KeepClient. See #4488. - return self.proxy_timeout if self.using_proxy else self.timeout + t = self.proxy_timeout if self.using_proxy else self.timeout + return (t[0] * (1 << attempt_number), t[1]) def build_services_list(self, force_rebuild=False): if (self._static_services_list or @@ -650,7 +697,7 @@ class KeepClient(object): for root in (local_roots + hint_roots) if roots_map[root].usable()] for keep_service in services_to_try: - blob = keep_service.get(locator, timeout=self.current_timeout()) + blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left)) if blob is not None: break loop.save_result((blob, len(services_to_try))) @@ -681,7 +728,7 @@ class KeepClient(object): "{} not found".format(loc_s), service_errors) else: raise arvados.errors.KeepReadError( - "failed to read {}".format(loc_s), service_errors) + "failed to read {}".format(loc_s), service_errors, label="service") @retry.retry_method def put(self, data, copies=2, num_retries=None): @@ -702,6 +749,12 @@ class KeepClient(object): exponential backoff. The default value is set when the KeepClient is initialized. """ + + if isinstance(data, unicode): + data = data.encode("ascii") + elif not isinstance(data, str): + raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'") + data_hash = hashlib.md5(data).hexdigest() if copies < 1: return data_hash @@ -733,7 +786,7 @@ class KeepClient(object): data_hash=data_hash, service_root=service_root, thread_limiter=thread_limiter, - timeout=self.current_timeout()) + timeout=self.current_timeout(num_retries-tries_left)) t.start() threads.append(t) for t in threads: @@ -752,7 +805,7 @@ class KeepClient(object): if not roots_map[key].success_flag) raise arvados.errors.KeepWriteError( "failed to write {} (wanted {} copies but wrote {})".format( - data_hash, copies, thread_limiter.done()), service_errors) + data_hash, copies, thread_limiter.done()), service_errors, label="service") def local_store_put(self, data, copies=1, num_retries=None): """A stub for put().