import gflags
-import httplib
-import httplib2
import logging
import os
import pprint
import datetime
import ssl
import socket
+import requests
import arvados
import arvados.config as config
self._cache_lock.release()
class KeepClient(object):
+
+ # Default Keep server connection timeout: 2 seconds
+ # Default Keep server read timeout: 300 seconds
+ # Default Keep proxy connection timeout: 20 seconds
+ # Default Keep proxy read timeout: 300 seconds
+ DEFAULT_TIMEOUT = (2, 300)
+ DEFAULT_PROXY_TIMEOUT = (20, 300)
+
class ThreadLimiter(object):
"""
Limit the number of threads running at a given time to
class KeepService(object):
# Make requests to a single Keep service, and track results.
- HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+ HTTP_ERRORS = (requests.exceptions.RequestException,
socket.error, ssl.SSLError)
def __init__(self, root, **headers):
def last_status(self):
try:
- return int(self.last_result[0].status)
- except (AttributeError, IndexError, ValueError, TypeError):
+ return self.last_result.status_code
+ except AttributeError:
return None
- def get(self, http, locator):
- # http is an httplib2.Http object.
+ def get(self, locator, timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
try:
with timer.Timer() as t:
- result = http.request(url.encode('utf-8'), 'GET',
- headers=self.get_headers)
+ result = requests.get(url.encode('utf-8'),
+ headers=self.get_headers,
+ timeout=timeout)
except self.HTTP_ERRORS as e:
_logger.debug("Request fail: GET %s => %s: %s",
url, type(e), str(e))
else:
self.last_result = result
self.success_flag = retry.check_http_response_success(result)
- content = result[1]
+ content = result.content
_logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
self.last_status(), len(content), t.msecs,
(len(content)/(1024.0*1024))/t.secs)
url, resp_md5)
return None
- def put(self, http, hash_s, body):
+ def put(self, hash_s, body, timeout=None):
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
try:
- result = http.request(url.encode('utf-8'), 'PUT',
- headers=self.put_headers, body=body)
+ result = requests.put(url.encode('utf-8'),
+ data=body,
+ headers=self.put_headers,
+ timeout=timeout)
except self.HTTP_ERRORS as e:
_logger.debug("Request fail: PUT %s => %s: %s",
url, type(e), str(e))
str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root'])
- h = httplib2.Http(timeout=self.args.get('timeout', None))
self._success = bool(self.service.put(
- h, self.args['data_hash'], self.args['data']))
+ self.args['data_hash'],
+ self.args['data'],
+ timeout=self.args.get('timeout', None)))
status = self.service.last_status()
if self._success:
- resp, body = self.service.last_result
+ result = self.service.last_result
_logger.debug("KeepWriterThread %s succeeded %s %s",
str(threading.current_thread()),
self.args['data_hash'],
# we're talking to a proxy or other backend that
# stores to multiple copies for us.
try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
+ replicas_stored = int(result.headers['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(body.strip(), replicas_stored)
+ limiter.save_response(result.text.strip(), replicas_stored)
elif status is not None:
_logger.debug("Request fail: PUT %s => %s %s",
self.args['data_hash'], status,
- self.service.last_result[1])
+ self.service.last_result.text)
- def __init__(self, api_client=None, proxy=None, timeout=300,
+ def __init__(self, api_client=None, proxy=None,
+ timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
num_retries=0):
"""Initialize a new KeepClient.
Keep proxy. Otherwise, KeepClient will fall back to the setting
of the ARVADOS_KEEP_PROXY configuration setting. If you want to
ensure KeepClient does not use a proxy, pass in an empty string.
- * timeout: The timeout for all HTTP requests, in seconds. Default
- 300.
+ * timeout: The timeout (in seconds) for HTTP requests to Keep
+ non-proxy servers. A tuple of two floats is interpreted as
+ (connection_timeout, read_timeout): see
+ http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+ Default: (2, 300).
+ * proxy_timeout: The timeout (in seconds) for HTTP requests to
+ Keep proxies. A tuple of two floats is interpreted as
+ (connection_timeout, read_timeout). Default: (20, 300).
* api_token: If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
local_store = os.environ.get('KEEP_LOCAL_STORE')
self.block_cache = block_cache if block_cache else KeepBlockCache()
+ self.timeout = timeout
+ self.proxy_timeout = proxy_timeout
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
- self.timeout = timeout
self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
self.using_proxy = None
self._static_services_list = False
+ def current_timeout(self):
+ """Return the appropriate timeout to use for this client: the proxy
+ timeout setting if the backend service is currently a proxy,
+ the regular timeout setting otherwise.
+ """
+ # TODO(twp): the timeout should be a property of a
+ # KeepService, not a KeepClient. See #4488.
+ return self.proxy_timeout if self.using_proxy else self.timeout
+
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
(self._keep_services and not force_rebuild)):
services_to_try = [roots_map[root]
for root in (local_roots + hint_roots)
if roots_map[root].usable()]
- http = httplib2.Http(timeout=self.timeout)
for keep_service in services_to_try:
- blob = keep_service.get(http, locator)
+ blob = keep_service.get(locator, timeout=self.current_timeout())
if blob is not None:
break
loop.save_result((blob, len(services_to_try)))
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.timeout)
+ timeout=self.current_timeout())
t.start()
threads.append(t)
for t in threads: