X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/adcef9f939b663d88a12d5d3597c3b0184d2579f..540b72d62a94015f116ba077e279a5f10d666778:/sdk/python/arvados/api.py diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py index 13dd8e9af8..a7f3837599 100644 --- a/sdk/python/arvados/api.py +++ b/sdk/python/arvados/api.py @@ -23,23 +23,36 @@ import re import socket import ssl import sys +import threading import time import types import apiclient +import apiclient.http from apiclient import discovery as apiclient_discovery from apiclient import errors as apiclient_errors from . import config from . import errors +from . import retry from . import util from . import cache +from .logging import GoogleHTTPClientFilter, log_handler _logger = logging.getLogger('arvados.api') +_googleapiclient_log_lock = threading.Lock() MAX_IDLE_CONNECTION_DURATION = 30 -RETRY_DELAY_INITIAL = 2 -RETRY_DELAY_BACKOFF = 2 -RETRY_COUNT = 2 + +# These constants supported our own retry logic that we've since removed in +# favor of using googleapiclient's num_retries. They're kept here purely for +# API compatibility, but set to 0 to indicate no retries happen. +RETRY_DELAY_INITIAL = 0 +RETRY_DELAY_BACKOFF = 0 +RETRY_COUNT = 0 + +# An unused HTTP 5xx status code to request a retry internally. +# See _intercept_http_request. This should not be user-visible. +_RETRY_4XX_STATUS = 545 if sys.version_info >= (3,): httplib2.SSLHandshakeError = None @@ -64,6 +77,24 @@ class OrderedJsonModel(apiclient.model.JsonModel): return body +_orig_retry_request = apiclient.http._retry_request +def _retry_request(http, num_retries, *args, **kwargs): + try: + num_retries = max(num_retries, http.num_retries) + except AttributeError: + # `http` client object does not have a `num_retries` attribute. + # It apparently hasn't gone through _patch_http_request, possibly + # because this isn't an Arvados API client. Pass through to + # avoid interfering with other Google API clients. + return _orig_retry_request(http, num_retries, *args, **kwargs) + response, body = _orig_retry_request(http, num_retries, *args, **kwargs) + # If _intercept_http_request ran out of retries for a 4xx response, + # restore the original status code. + if response.status == _RETRY_4XX_STATUS: + response.status = int(response['status']) + return (response, body) +apiclient.http._retry_request = _retry_request + def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs): if not headers.get('X-Request-Id'): headers['X-Request-Id'] = self._request_id() @@ -75,12 +106,7 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs): headers['Authorization'] = 'OAuth2 %s' % self.arvados_api_token - retryable = method in [ - 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT'] - retry_count = self._retry_count if retryable else 0 - - if (not retryable and - time.time() - self._last_request_time > self._max_keepalive_idle): + if (time.time() - self._last_request_time) > self._max_keepalive_idle: # High probability of failure due to connection atrophy. Make # sure this request [re]opens a new connection by closing and # forgetting all cached connections first. @@ -88,32 +114,17 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs): conn.close() self.connections.clear() - delay = self._retry_delay_initial - for _ in range(retry_count): - self._last_request_time = time.time() - try: - return self.orig_http_request(uri, method, headers=headers, **kwargs) - except http.client.HTTPException: - _logger.debug("[%s] Retrying API request in %d s after HTTP error", - headers['X-Request-Id'], delay, exc_info=True) - except ssl.SSLCertVerificationError as e: - raise ssl.SSLCertVerificationError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None - except socket.error: - # This is the one case where httplib2 doesn't close the - # underlying connection first. Close all open - # connections, expecting this object only has the one - # connection to the API server. This is safe because - # httplib2 reopens connections when needed. - _logger.debug("[%s] Retrying API request in %d s after socket error", - headers['X-Request-Id'], delay, exc_info=True) - for conn in self.connections.values(): - conn.close() - - time.sleep(delay) - delay = delay * self._retry_delay_backoff - self._last_request_time = time.time() - return self.orig_http_request(uri, method, headers=headers, **kwargs) + try: + response, body = self.orig_http_request(uri, method, headers=headers, **kwargs) + except ssl.SSLCertVerificationError as e: + raise ssl.SSLCertVerificationError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None + # googleapiclient only retries 403, 429, and 5xx status codes. + # If we got another 4xx status that we want to retry, convert it into + # 5xx so googleapiclient handles it the way we want. + if response.status in retry._HTTP_CAN_RETRY and response.status < 500: + response.status = _RETRY_4XX_STATUS + return (response, body) except Exception as e: # Prepend "[request_id] " to the error message, which we # assume is the first string argument passed to the exception @@ -124,16 +135,14 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs): raise type(e)(*e.args) raise -def _patch_http_request(http, api_token): +def _patch_http_request(http, api_token, num_retries): http.arvados_api_token = api_token http.max_request_size = 0 + http.num_retries = num_retries http.orig_http_request = http.request http.request = types.MethodType(_intercept_http_request, http) http._last_request_time = 0 http._max_keepalive_idle = MAX_IDLE_CONNECTION_DURATION - http._retry_delay_initial = RETRY_DELAY_INITIAL - http._retry_delay_backoff = RETRY_DELAY_BACKOFF - http._retry_count = RETRY_COUNT http._request_id = util.new_request_id return http @@ -182,6 +191,7 @@ def api_client( cache=True, http=None, insecure=False, + num_retries=10, request_id=None, timeout=5*60, **kwargs, @@ -219,6 +229,10 @@ def api_client( insecure: bool : If true, ignore SSL certificate validation errors. Default `False`. + num_retries: int + : The number of times to retry each API request if it encounters a + temporary failure. Default 10. + request_id: str | None : Default `X-Request-Id` header value for outgoing requests that don't already provide one. If `None` or omitted, generate a random @@ -239,15 +253,46 @@ def api_client( ) if http.timeout is None: http.timeout = timeout - http = _patch_http_request(http, token) - - svc = apiclient_discovery.build( - 'arvados', version, - cache_discovery=False, - discoveryServiceUrl=discoveryServiceUrl, - http=http, - **kwargs, + http = _patch_http_request(http, token, num_retries) + + # The first time a client is instantiated, temporarily route + # googleapiclient.http retry logs if they're not already. These are + # important because temporary problems fetching the discovery document + # can cause clients to appear to hang early. This can be removed after + # we have a more general story for handling googleapiclient logs (#20521). + client_logger = logging.getLogger('googleapiclient.http') + # "first time a client is instantiated" = thread that acquires this lock + # It is never released. + # googleapiclient sets up its own NullHandler so we detect if logging is + # configured by looking for a real handler anywhere in the hierarchy. + client_logger_unconfigured = _googleapiclient_log_lock.acquire(blocking=False) and all( + isinstance(handler, logging.NullHandler) + for logger_name in ['', 'googleapiclient', 'googleapiclient.http'] + for handler in logging.getLogger(logger_name).handlers ) + if client_logger_unconfigured: + client_level = client_logger.level + client_filter = GoogleHTTPClientFilter() + client_logger.addFilter(client_filter) + client_logger.addHandler(log_handler) + if logging.NOTSET < client_level < client_filter.retry_levelno: + client_logger.setLevel(client_level) + else: + client_logger.setLevel(client_filter.retry_levelno) + try: + svc = apiclient_discovery.build( + 'arvados', version, + cache_discovery=False, + discoveryServiceUrl=discoveryServiceUrl, + http=http, + num_retries=num_retries, + **kwargs, + ) + finally: + if client_logger_unconfigured: + client_logger.removeHandler(log_handler) + client_logger.removeFilter(client_filter) + client_logger.setLevel(client_level) svc.api_token = token svc.insecure = insecure svc.request_id = request_id @@ -374,6 +419,11 @@ def api(version=None, cache=True, host=None, token=None, insecure=False, like you would write in user configuration; or pass additional arguments for lower-level control over the client. + This function returns a `arvados.safeapi.ThreadSafeApiCache`, an + API-compatible wrapper around `googleapiclient.discovery.Resource`. If + you're handling concurrency yourself and/or your application is very + performance-sensitive, consider calling `api_client` directly. + Arguments: version: str | None @@ -398,21 +448,20 @@ def api(version=None, cache=True, host=None, token=None, insecure=False, Other arguments are passed directly to `api_client`. See that function's docstring for more information about their meaning. """ - if discoveryServiceUrl or host or token: - # We pass `insecure` here for symmetry with `api_kwargs_from_config`. - client_kwargs = normalize_api_kwargs( - version, discoveryServiceUrl, host, token, - insecure=insecure, - ) - else: - client_kwargs = api_kwargs_from_config(version) - return api_client( - **client_kwargs, + kwargs.update( cache=cache, + insecure=insecure, request_id=request_id, timeout=timeout, - **kwargs, ) + if discoveryServiceUrl or host or token: + kwargs.update(normalize_api_kwargs(version, discoveryServiceUrl, host, token)) + else: + kwargs.update(api_kwargs_from_config(version)) + version = kwargs.pop('version') + # We do the import here to avoid a circular import at the top level. + from .safeapi import ThreadSafeApiCache + return ThreadSafeApiCache({}, {}, kwargs, version) def api_from_config(version=None, apiconfig=None, **kwargs): """Build an Arvados API client from a configuration mapping @@ -421,6 +470,11 @@ def api_from_config(version=None, apiconfig=None, **kwargs): configuration. It accepts that mapping as an argument, so you can use a configuration that's different from what the user has set up. + This function returns a `arvados.safeapi.ThreadSafeApiCache`, an + API-compatible wrapper around `googleapiclient.discovery.Resource`. If + you're handling concurrency yourself and/or your application is very + performance-sensitive, consider calling `api_client` directly. + Arguments: version: str | None @@ -435,4 +489,4 @@ def api_from_config(version=None, apiconfig=None, **kwargs): Other arguments are passed directly to `api_client`. See that function's docstring for more information about their meaning. """ - return api_client(**api_kwargs_from_config(version, apiconfig, **kwargs)) + return api(**api_kwargs_from_config(version, apiconfig, **kwargs))