+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from future import standard_library
+standard_library.install_aliases()
+from builtins import range
import collections
+import http.client
import httplib2
import json
import logging
import os
import re
import socket
+import time
import types
import apiclient
from apiclient import discovery as apiclient_discovery
from apiclient import errors as apiclient_errors
-import config
-import errors
-import util
+from . import config
+from . import errors
+from . import util
+from . import cache
_logger = logging.getLogger('arvados.api')
+MAX_IDLE_CONNECTION_DURATION = 30
+RETRY_DELAY_INITIAL = 2
+RETRY_DELAY_BACKOFF = 2
+RETRY_COUNT = 2
+
class OrderedJsonModel(apiclient.model.JsonModel):
"""Model class for JSON that preserves the contents' order.
return body
-def _intercept_http_request(self, uri, **kwargs):
- from httplib import BadStatusLine
-
+def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
if (self.max_request_size and
kwargs.get('body') and
self.max_request_size < len(kwargs['body'])):
raise apiclient_errors.MediaUploadSizeError("Request size %i bytes exceeds published limit of %i bytes" % (len(kwargs['body']), self.max_request_size))
- if 'headers' not in kwargs:
- kwargs['headers'] = {}
-
if config.get("ARVADOS_EXTERNAL_CLIENT", "") == "true":
- kwargs['headers']['X-External-Client'] = '1'
-
- kwargs['headers']['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
- try:
- return self.orig_http_request(uri, **kwargs)
- except BadStatusLine:
- # This is how httplib tells us that it tried to reuse an
- # existing connection but it was already closed by the
- # server. In that case, yes, we would like to retry.
- # Unfortunately, we are not absolutely certain that the
- # previous call did not succeed, so this is slightly
- # risky.
- return self.orig_http_request(uri, **kwargs)
- except socket.error:
- # This is the one case where httplib2 doesn't close the
- # underlying connection first. Close all open connections,
- # expecting this object only has the one connection to the API
- # server. This is safe because httplib2 reopens connections when
- # needed.
- _logger.debug("Retrying API request after socket error", exc_info=True)
- for conn in self.connections.itervalues():
+ headers['X-External-Client'] = '1'
+
+ headers['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
+ if not headers.get('X-Request-Id'):
+ headers['X-Request-Id'] = self._request_id()
+
+ retryable = method in [
+ 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT']
+ retry_count = self._retry_count if retryable else 0
+
+ if (not retryable and
+ time.time() - self._last_request_time > self._max_keepalive_idle):
+ # High probability of failure due to connection atrophy. Make
+ # sure this request [re]opens a new connection by closing and
+ # forgetting all cached connections first.
+ for conn in self.connections.values():
conn.close()
- return self.orig_http_request(uri, **kwargs)
+ self.connections.clear()
+
+ delay = self._retry_delay_initial
+ for _ in range(retry_count):
+ self._last_request_time = time.time()
+ try:
+ return self.orig_http_request(uri, method, headers=headers, **kwargs)
+ except http.client.HTTPException:
+ _logger.debug("Retrying API request in %d s after HTTP error",
+ delay, exc_info=True)
+ except socket.error:
+ # This is the one case where httplib2 doesn't close the
+ # underlying connection first. Close all open
+ # connections, expecting this object only has the one
+ # connection to the API server. This is safe because
+ # httplib2 reopens connections when needed.
+ _logger.debug("Retrying API request in %d s after socket error",
+ delay, exc_info=True)
+ for conn in self.connections.values():
+ conn.close()
+ except httplib2.SSLHandshakeError as e:
+ # Intercept and re-raise with a better error message.
+ raise httplib2.SSLHandshakeError("Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e))
+
+ time.sleep(delay)
+ delay = delay * self._retry_delay_backoff
+
+ self._last_request_time = time.time()
+ return self.orig_http_request(uri, method, headers=headers, **kwargs)
def _patch_http_request(http, api_token):
http.arvados_api_token = api_token
http.max_request_size = 0
http.orig_http_request = http.request
http.request = types.MethodType(_intercept_http_request, http)
+ http._last_request_time = 0
+ http._max_keepalive_idle = MAX_IDLE_CONNECTION_DURATION
+ http._retry_delay_initial = RETRY_DELAY_INITIAL
+ http._retry_delay_backoff = RETRY_DELAY_BACKOFF
+ http._retry_count = RETRY_COUNT
+ http._request_id = util.new_request_id
return http
# Monkey patch discovery._cast() so objects and arrays get serialized
def _cast_objects_too(value, schema_type):
global _cast_orig
if (type(value) != type('') and
+ type(value) != type(b'') and
(schema_type == 'object' or schema_type == 'array')):
return json.dumps(value)
else:
try:
util.mkdir_dash_p(path)
except OSError:
- path = None
- return path
+ return None
+ return cache.SafeHTTPCache(path, max_age=60*60*24*2)
-def api(version=None, cache=True, host=None, token=None, insecure=False, **kwargs):
+def api(version=None, cache=True, host=None, token=None, insecure=False,
+ request_id=None, timeout=5*60, **kwargs):
"""Return an apiclient Resources object for an Arvados instance.
:version:
:insecure:
If True, ignore SSL certificate validation errors.
+ :timeout:
+ A timeout value for http requests.
+
+ :request_id:
+ Default X-Request-Id header value for outgoing requests that
+ don't already provide one. If None or omitted, generate a random
+ ID. When retrying failed requests, the same ID is used on all
+ attempts.
+
Additional keyword arguments will be passed directly to
`apiclient_discovery.build` if a new Resource object is created.
If the `discoveryServiceUrl` or `http` keyword arguments are
elif host and token:
pass
elif not host and not token:
- return api_from_config(version=version, cache=cache, **kwargs)
+ return api_from_config(
+ version=version, cache=cache, request_id=request_id, **kwargs)
else:
# Caller provided one but not the other
if not host:
http_kwargs['disable_ssl_certificate_validation'] = True
kwargs['http'] = httplib2.Http(**http_kwargs)
+ if kwargs['http'].timeout is None:
+ kwargs['http'].timeout = timeout
+
kwargs['http'] = _patch_http_request(kwargs['http'], token)
- svc = apiclient_discovery.build('arvados', version, **kwargs)
+ svc = apiclient_discovery.build('arvados', version, cache_discovery=False, **kwargs)
svc.api_token = token
+ svc.insecure = insecure
+ svc.request_id = request_id
kwargs['http'].max_request_size = svc._rootDesc.get('maxRequestSize', 0)
kwargs['http'].cache = None
+ kwargs['http']._request_id = lambda: svc.request_id or util.new_request_id()
return svc
def api_from_config(version=None, apiconfig=None, **kwargs):
if apiconfig is None:
apiconfig = config.settings()
+ errors = []
for x in ['ARVADOS_API_HOST', 'ARVADOS_API_TOKEN']:
if x not in apiconfig:
- raise ValueError("%s is not set. Aborting." % x)
+ errors.append(x)
+ if errors:
+ raise ValueError(" and ".join(errors)+" not set.\nPlease set in %s or export environment variable." % config.default_config_file)
host = apiconfig.get('ARVADOS_API_HOST')
token = apiconfig.get('ARVADOS_API_TOKEN')
insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE', apiconfig)