X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/35895ee91c820680bb7df9696ab2e92525ead2ac..540b72d62a94015f116ba077e279a5f10d666778:/sdk/python/arvados/api.py diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py index 537ad20820..a7f3837599 100644 --- a/sdk/python/arvados/api.py +++ b/sdk/python/arvados/api.py @@ -23,18 +23,23 @@ import re import socket import ssl import sys +import threading import time import types import apiclient +import apiclient.http from apiclient import discovery as apiclient_discovery from apiclient import errors as apiclient_errors from . import config from . import errors +from . import retry from . import util from . import cache +from .logging import GoogleHTTPClientFilter, log_handler _logger = logging.getLogger('arvados.api') +_googleapiclient_log_lock = threading.Lock() MAX_IDLE_CONNECTION_DURATION = 30 @@ -45,6 +50,10 @@ RETRY_DELAY_INITIAL = 0 RETRY_DELAY_BACKOFF = 0 RETRY_COUNT = 0 +# An unused HTTP 5xx status code to request a retry internally. +# See _intercept_http_request. This should not be user-visible. +_RETRY_4XX_STATUS = 545 + if sys.version_info >= (3,): httplib2.SSLHandshakeError = None @@ -68,6 +77,24 @@ class OrderedJsonModel(apiclient.model.JsonModel): return body +_orig_retry_request = apiclient.http._retry_request +def _retry_request(http, num_retries, *args, **kwargs): + try: + num_retries = max(num_retries, http.num_retries) + except AttributeError: + # `http` client object does not have a `num_retries` attribute. + # It apparently hasn't gone through _patch_http_request, possibly + # because this isn't an Arvados API client. Pass through to + # avoid interfering with other Google API clients. + return _orig_retry_request(http, num_retries, *args, **kwargs) + response, body = _orig_retry_request(http, num_retries, *args, **kwargs) + # If _intercept_http_request ran out of retries for a 4xx response, + # restore the original status code. + if response.status == _RETRY_4XX_STATUS: + response.status = int(response['status']) + return (response, body) +apiclient.http._retry_request = _retry_request + def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs): if not headers.get('X-Request-Id'): headers['X-Request-Id'] = self._request_id() @@ -89,9 +116,15 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs): self._last_request_time = time.time() try: - return self.orig_http_request(uri, method, headers=headers, **kwargs) + response, body = self.orig_http_request(uri, method, headers=headers, **kwargs) except ssl.SSLCertVerificationError as e: raise ssl.SSLCertVerificationError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None + # googleapiclient only retries 403, 429, and 5xx status codes. + # If we got another 4xx status that we want to retry, convert it into + # 5xx so googleapiclient handles it the way we want. + if response.status in retry._HTTP_CAN_RETRY and response.status < 500: + response.status = _RETRY_4XX_STATUS + return (response, body) except Exception as e: # Prepend "[request_id] " to the error message, which we # assume is the first string argument passed to the exception @@ -102,9 +135,10 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs): raise type(e)(*e.args) raise -def _patch_http_request(http, api_token): +def _patch_http_request(http, api_token, num_retries): http.arvados_api_token = api_token http.max_request_size = 0 + http.num_retries = num_retries http.orig_http_request = http.request http.request = types.MethodType(_intercept_http_request, http) http._last_request_time = 0 @@ -157,6 +191,7 @@ def api_client( cache=True, http=None, insecure=False, + num_retries=10, request_id=None, timeout=5*60, **kwargs, @@ -194,6 +229,10 @@ def api_client( insecure: bool : If true, ignore SSL certificate validation errors. Default `False`. + num_retries: int + : The number of times to retry each API request if it encounters a + temporary failure. Default 10. + request_id: str | None : Default `X-Request-Id` header value for outgoing requests that don't already provide one. If `None` or omitted, generate a random @@ -214,15 +253,46 @@ def api_client( ) if http.timeout is None: http.timeout = timeout - http = _patch_http_request(http, token) - - svc = apiclient_discovery.build( - 'arvados', version, - cache_discovery=False, - discoveryServiceUrl=discoveryServiceUrl, - http=http, - **kwargs, + http = _patch_http_request(http, token, num_retries) + + # The first time a client is instantiated, temporarily route + # googleapiclient.http retry logs if they're not already. These are + # important because temporary problems fetching the discovery document + # can cause clients to appear to hang early. This can be removed after + # we have a more general story for handling googleapiclient logs (#20521). + client_logger = logging.getLogger('googleapiclient.http') + # "first time a client is instantiated" = thread that acquires this lock + # It is never released. + # googleapiclient sets up its own NullHandler so we detect if logging is + # configured by looking for a real handler anywhere in the hierarchy. + client_logger_unconfigured = _googleapiclient_log_lock.acquire(blocking=False) and all( + isinstance(handler, logging.NullHandler) + for logger_name in ['', 'googleapiclient', 'googleapiclient.http'] + for handler in logging.getLogger(logger_name).handlers ) + if client_logger_unconfigured: + client_level = client_logger.level + client_filter = GoogleHTTPClientFilter() + client_logger.addFilter(client_filter) + client_logger.addHandler(log_handler) + if logging.NOTSET < client_level < client_filter.retry_levelno: + client_logger.setLevel(client_level) + else: + client_logger.setLevel(client_filter.retry_levelno) + try: + svc = apiclient_discovery.build( + 'arvados', version, + cache_discovery=False, + discoveryServiceUrl=discoveryServiceUrl, + http=http, + num_retries=num_retries, + **kwargs, + ) + finally: + if client_logger_unconfigured: + client_logger.removeHandler(log_handler) + client_logger.removeFilter(client_filter) + client_logger.setLevel(client_level) svc.api_token = token svc.insecure = insecure svc.request_id = request_id