Merge branch '20647-cr-logs-preflight'
[arvados.git] / sdk / python / arvados / api.py
index 19154f3e8b368f5b0dbeb631e4290ac7f32d101f..a7f3837599c20d82df65a50e6e139d89629f56b5 100644 (file)
@@ -23,23 +23,36 @@ import re
 import socket
 import ssl
 import sys
+import threading
 import time
 import types
 
 import apiclient
+import apiclient.http
 from apiclient import discovery as apiclient_discovery
 from apiclient import errors as apiclient_errors
 from . import config
 from . import errors
+from . import retry
 from . import util
 from . import cache
+from .logging import GoogleHTTPClientFilter, log_handler
 
 _logger = logging.getLogger('arvados.api')
+_googleapiclient_log_lock = threading.Lock()
 
 MAX_IDLE_CONNECTION_DURATION = 30
-RETRY_DELAY_INITIAL = 2
-RETRY_DELAY_BACKOFF = 2
-RETRY_COUNT = 2
+
+# These constants supported our own retry logic that we've since removed in
+# favor of using googleapiclient's num_retries. They're kept here purely for
+# API compatibility, but set to 0 to indicate no retries happen.
+RETRY_DELAY_INITIAL = 0
+RETRY_DELAY_BACKOFF = 0
+RETRY_COUNT = 0
+
+# An unused HTTP 5xx status code to request a retry internally.
+# See _intercept_http_request. This should not be user-visible.
+_RETRY_4XX_STATUS = 545
 
 if sys.version_info >= (3,):
     httplib2.SSLHandshakeError = None
@@ -64,6 +77,24 @@ class OrderedJsonModel(apiclient.model.JsonModel):
         return body
 
 
+_orig_retry_request = apiclient.http._retry_request
+def _retry_request(http, num_retries, *args, **kwargs):
+    try:
+        num_retries = max(num_retries, http.num_retries)
+    except AttributeError:
+        # `http` client object does not have a `num_retries` attribute.
+        # It apparently hasn't gone through _patch_http_request, possibly
+        # because this isn't an Arvados API client. Pass through to
+        # avoid interfering with other Google API clients.
+        return _orig_retry_request(http, num_retries, *args, **kwargs)
+    response, body = _orig_retry_request(http, num_retries, *args, **kwargs)
+    # If _intercept_http_request ran out of retries for a 4xx response,
+    # restore the original status code.
+    if response.status == _RETRY_4XX_STATUS:
+        response.status = int(response['status'])
+    return (response, body)
+apiclient.http._retry_request = _retry_request
+
 def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
     if not headers.get('X-Request-Id'):
         headers['X-Request-Id'] = self._request_id()
@@ -75,12 +106,7 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
 
         headers['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
 
-        retryable = method in [
-            'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT']
-        retry_count = self._retry_count if retryable else 0
-
-        if (not retryable and
-            time.time() - self._last_request_time > self._max_keepalive_idle):
+        if (time.time() - self._last_request_time) > self._max_keepalive_idle:
             # High probability of failure due to connection atrophy. Make
             # sure this request [re]opens a new connection by closing and
             # forgetting all cached connections first.
@@ -88,32 +114,17 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
                 conn.close()
             self.connections.clear()
 
-        delay = self._retry_delay_initial
-        for _ in range(retry_count):
-            self._last_request_time = time.time()
-            try:
-                return self.orig_http_request(uri, method, headers=headers, **kwargs)
-            except http.client.HTTPException:
-                _logger.debug("[%s] Retrying API request in %d s after HTTP error",
-                              headers['X-Request-Id'], delay, exc_info=True)
-            except ssl.SSLCertVerificationError as e:
-                raise ssl.SSLCertVerificationError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None
-            except socket.error:
-                # This is the one case where httplib2 doesn't close the
-                # underlying connection first.  Close all open
-                # connections, expecting this object only has the one
-                # connection to the API server.  This is safe because
-                # httplib2 reopens connections when needed.
-                _logger.debug("[%s] Retrying API request in %d s after socket error",
-                              headers['X-Request-Id'], delay, exc_info=True)
-                for conn in self.connections.values():
-                    conn.close()
-
-            time.sleep(delay)
-            delay = delay * self._retry_delay_backoff
-
         self._last_request_time = time.time()
-        return self.orig_http_request(uri, method, headers=headers, **kwargs)
+        try:
+            response, body = self.orig_http_request(uri, method, headers=headers, **kwargs)
+        except ssl.SSLCertVerificationError as e:
+            raise ssl.SSLCertVerificationError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None
+        # googleapiclient only retries 403, 429, and 5xx status codes.
+        # If we got another 4xx status that we want to retry, convert it into
+        # 5xx so googleapiclient handles it the way we want.
+        if response.status in retry._HTTP_CAN_RETRY and response.status < 500:
+            response.status = _RETRY_4XX_STATUS
+        return (response, body)
     except Exception as e:
         # Prepend "[request_id] " to the error message, which we
         # assume is the first string argument passed to the exception
@@ -124,16 +135,14 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
                 raise type(e)(*e.args)
         raise
 
-def _patch_http_request(http, api_token):
+def _patch_http_request(http, api_token, num_retries):
     http.arvados_api_token = api_token
     http.max_request_size = 0
+    http.num_retries = num_retries
     http.orig_http_request = http.request
     http.request = types.MethodType(_intercept_http_request, http)
     http._last_request_time = 0
     http._max_keepalive_idle = MAX_IDLE_CONNECTION_DURATION
-    http._retry_delay_initial = RETRY_DELAY_INITIAL
-    http._retry_delay_backoff = RETRY_DELAY_BACKOFF
-    http._retry_count = RETRY_COUNT
     http._request_id = util.new_request_id
     return http
 
@@ -182,6 +191,7 @@ def api_client(
         cache=True,
         http=None,
         insecure=False,
+        num_retries=10,
         request_id=None,
         timeout=5*60,
         **kwargs,
@@ -219,6 +229,10 @@ def api_client(
     insecure: bool
     : If true, ignore SSL certificate validation errors. Default `False`.
 
+    num_retries: int
+    : The number of times to retry each API request if it encounters a
+      temporary failure. Default 10.
+
     request_id: str | None
     : Default `X-Request-Id` header value for outgoing requests that
       don't already provide one. If `None` or omitted, generate a random
@@ -239,15 +253,46 @@ def api_client(
         )
     if http.timeout is None:
         http.timeout = timeout
-    http = _patch_http_request(http, token)
-
-    svc = apiclient_discovery.build(
-        'arvados', version,
-        cache_discovery=False,
-        discoveryServiceUrl=discoveryServiceUrl,
-        http=http,
-        **kwargs,
+    http = _patch_http_request(http, token, num_retries)
+
+    # The first time a client is instantiated, temporarily route
+    # googleapiclient.http retry logs if they're not already. These are
+    # important because temporary problems fetching the discovery document
+    # can cause clients to appear to hang early. This can be removed after
+    # we have a more general story for handling googleapiclient logs (#20521).
+    client_logger = logging.getLogger('googleapiclient.http')
+    # "first time a client is instantiated" = thread that acquires this lock
+    # It is never released.
+    # googleapiclient sets up its own NullHandler so we detect if logging is
+    # configured by looking for a real handler anywhere in the hierarchy.
+    client_logger_unconfigured = _googleapiclient_log_lock.acquire(blocking=False) and all(
+        isinstance(handler, logging.NullHandler)
+        for logger_name in ['', 'googleapiclient', 'googleapiclient.http']
+        for handler in logging.getLogger(logger_name).handlers
     )
+    if client_logger_unconfigured:
+        client_level = client_logger.level
+        client_filter = GoogleHTTPClientFilter()
+        client_logger.addFilter(client_filter)
+        client_logger.addHandler(log_handler)
+        if logging.NOTSET < client_level < client_filter.retry_levelno:
+            client_logger.setLevel(client_level)
+        else:
+            client_logger.setLevel(client_filter.retry_levelno)
+    try:
+        svc = apiclient_discovery.build(
+            'arvados', version,
+            cache_discovery=False,
+            discoveryServiceUrl=discoveryServiceUrl,
+            http=http,
+            num_retries=num_retries,
+            **kwargs,
+        )
+    finally:
+        if client_logger_unconfigured:
+            client_logger.removeHandler(log_handler)
+            client_logger.removeFilter(client_filter)
+            client_logger.setLevel(client_level)
     svc.api_token = token
     svc.insecure = insecure
     svc.request_id = request_id