Merge branch '20647-cr-logs-preflight'
[arvados.git] / sdk / python / arvados / api.py
index 537ad2082029333192302a8deb4bec24d5032d27..a7f3837599c20d82df65a50e6e139d89629f56b5 100644 (file)
@@ -23,18 +23,23 @@ import re
 import socket
 import ssl
 import sys
+import threading
 import time
 import types
 
 import apiclient
+import apiclient.http
 from apiclient import discovery as apiclient_discovery
 from apiclient import errors as apiclient_errors
 from . import config
 from . import errors
+from . import retry
 from . import util
 from . import cache
+from .logging import GoogleHTTPClientFilter, log_handler
 
 _logger = logging.getLogger('arvados.api')
+_googleapiclient_log_lock = threading.Lock()
 
 MAX_IDLE_CONNECTION_DURATION = 30
 
@@ -45,6 +50,10 @@ RETRY_DELAY_INITIAL = 0
 RETRY_DELAY_BACKOFF = 0
 RETRY_COUNT = 0
 
+# An unused HTTP 5xx status code to request a retry internally.
+# See _intercept_http_request. This should not be user-visible.
+_RETRY_4XX_STATUS = 545
+
 if sys.version_info >= (3,):
     httplib2.SSLHandshakeError = None
 
@@ -68,6 +77,24 @@ class OrderedJsonModel(apiclient.model.JsonModel):
         return body
 
 
+_orig_retry_request = apiclient.http._retry_request
+def _retry_request(http, num_retries, *args, **kwargs):
+    try:
+        num_retries = max(num_retries, http.num_retries)
+    except AttributeError:
+        # `http` client object does not have a `num_retries` attribute.
+        # It apparently hasn't gone through _patch_http_request, possibly
+        # because this isn't an Arvados API client. Pass through to
+        # avoid interfering with other Google API clients.
+        return _orig_retry_request(http, num_retries, *args, **kwargs)
+    response, body = _orig_retry_request(http, num_retries, *args, **kwargs)
+    # If _intercept_http_request ran out of retries for a 4xx response,
+    # restore the original status code.
+    if response.status == _RETRY_4XX_STATUS:
+        response.status = int(response['status'])
+    return (response, body)
+apiclient.http._retry_request = _retry_request
+
 def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
     if not headers.get('X-Request-Id'):
         headers['X-Request-Id'] = self._request_id()
@@ -89,9 +116,15 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
 
         self._last_request_time = time.time()
         try:
-            return self.orig_http_request(uri, method, headers=headers, **kwargs)
+            response, body = self.orig_http_request(uri, method, headers=headers, **kwargs)
         except ssl.SSLCertVerificationError as e:
             raise ssl.SSLCertVerificationError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None
+        # googleapiclient only retries 403, 429, and 5xx status codes.
+        # If we got another 4xx status that we want to retry, convert it into
+        # 5xx so googleapiclient handles it the way we want.
+        if response.status in retry._HTTP_CAN_RETRY and response.status < 500:
+            response.status = _RETRY_4XX_STATUS
+        return (response, body)
     except Exception as e:
         # Prepend "[request_id] " to the error message, which we
         # assume is the first string argument passed to the exception
@@ -102,9 +135,10 @@ def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
                 raise type(e)(*e.args)
         raise
 
-def _patch_http_request(http, api_token):
+def _patch_http_request(http, api_token, num_retries):
     http.arvados_api_token = api_token
     http.max_request_size = 0
+    http.num_retries = num_retries
     http.orig_http_request = http.request
     http.request = types.MethodType(_intercept_http_request, http)
     http._last_request_time = 0
@@ -157,6 +191,7 @@ def api_client(
         cache=True,
         http=None,
         insecure=False,
+        num_retries=10,
         request_id=None,
         timeout=5*60,
         **kwargs,
@@ -194,6 +229,10 @@ def api_client(
     insecure: bool
     : If true, ignore SSL certificate validation errors. Default `False`.
 
+    num_retries: int
+    : The number of times to retry each API request if it encounters a
+      temporary failure. Default 10.
+
     request_id: str | None
     : Default `X-Request-Id` header value for outgoing requests that
       don't already provide one. If `None` or omitted, generate a random
@@ -214,15 +253,46 @@ def api_client(
         )
     if http.timeout is None:
         http.timeout = timeout
-    http = _patch_http_request(http, token)
-
-    svc = apiclient_discovery.build(
-        'arvados', version,
-        cache_discovery=False,
-        discoveryServiceUrl=discoveryServiceUrl,
-        http=http,
-        **kwargs,
+    http = _patch_http_request(http, token, num_retries)
+
+    # The first time a client is instantiated, temporarily route
+    # googleapiclient.http retry logs if they're not already. These are
+    # important because temporary problems fetching the discovery document
+    # can cause clients to appear to hang early. This can be removed after
+    # we have a more general story for handling googleapiclient logs (#20521).
+    client_logger = logging.getLogger('googleapiclient.http')
+    # "first time a client is instantiated" = thread that acquires this lock
+    # It is never released.
+    # googleapiclient sets up its own NullHandler so we detect if logging is
+    # configured by looking for a real handler anywhere in the hierarchy.
+    client_logger_unconfigured = _googleapiclient_log_lock.acquire(blocking=False) and all(
+        isinstance(handler, logging.NullHandler)
+        for logger_name in ['', 'googleapiclient', 'googleapiclient.http']
+        for handler in logging.getLogger(logger_name).handlers
     )
+    if client_logger_unconfigured:
+        client_level = client_logger.level
+        client_filter = GoogleHTTPClientFilter()
+        client_logger.addFilter(client_filter)
+        client_logger.addHandler(log_handler)
+        if logging.NOTSET < client_level < client_filter.retry_levelno:
+            client_logger.setLevel(client_level)
+        else:
+            client_logger.setLevel(client_filter.retry_levelno)
+    try:
+        svc = apiclient_discovery.build(
+            'arvados', version,
+            cache_discovery=False,
+            discoveryServiceUrl=discoveryServiceUrl,
+            http=http,
+            num_retries=num_retries,
+            **kwargs,
+        )
+    finally:
+        if client_logger_unconfigured:
+            client_logger.removeHandler(log_handler)
+            client_logger.removeFilter(client_filter)
+            client_logger.setLevel(client_level)
     svc.api_token = token
     svc.insecure = insecure
     svc.request_id = request_id