import gflags
-import httplib
-import httplib2
import logging
import os
import pprint
import datetime
import ssl
import socket
-
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
+import requests
import arvados
import arvados.config as config
import arvados.retry as retry
import arvados.util
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
self.ready.set()
def size(self):
- if self.content == None:
+ if self.content is None:
return 0
else:
return len(self.content)
self._cache_lock.release()
class KeepClient(object):
+
+ # Default Keep server connection timeout: 2 seconds
+ # Default Keep server read timeout: 300 seconds
+ # Default Keep proxy connection timeout: 20 seconds
+ # Default Keep proxy read timeout: 300 seconds
+ DEFAULT_TIMEOUT = (2, 300)
+ DEFAULT_PROXY_TIMEOUT = (20, 300)
+
class ThreadLimiter(object):
"""
Limit the number of threads running at a given time to
class KeepService(object):
# Make requests to a single Keep service, and track results.
- HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+ HTTP_ERRORS = (requests.exceptions.RequestException,
socket.error, ssl.SSLError)
def __init__(self, root, **headers):
def last_status(self):
try:
- return int(self.last_result[0].status)
- except (AttributeError, IndexError, ValueError):
+ return self.last_result.status_code
+ except AttributeError:
return None
- def get(self, http, locator):
- # http is an httplib2.Http object.
+ def get(self, locator, timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
try:
with timer.Timer() as t:
- result = http.request(url.encode('utf-8'), 'GET',
- headers=self.get_headers)
+ result = requests.get(url.encode('utf-8'),
+ headers=self.get_headers,
+ timeout=timeout)
except self.HTTP_ERRORS as e:
_logger.debug("Request fail: GET %s => %s: %s",
url, type(e), str(e))
else:
self.last_result = result
self.success_flag = retry.check_http_response_success(result)
- content = result[1]
+ content = result.content
_logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
self.last_status(), len(content), t.msecs,
(len(content)/(1024.0*1024))/t.secs)
url, resp_md5)
return None
- def put(self, http, hash_s, body):
+ def put(self, hash_s, body, timeout=None):
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
try:
- result = http.request(url.encode('utf-8'), 'PUT',
- headers=self.put_headers, body=body)
+ result = requests.put(url.encode('utf-8'),
+ data=body,
+ headers=self.put_headers,
+ timeout=timeout)
except self.HTTP_ERRORS as e:
_logger.debug("Request fail: PUT %s => %s: %s",
url, type(e), str(e))
str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root'])
- h = httplib2.Http(timeout=self.args.get('timeout', None))
self._success = bool(self.service.put(
- h, self.args['data_hash'], self.args['data']))
+ self.args['data_hash'],
+ self.args['data'],
+ timeout=self.args.get('timeout', None)))
status = self.service.last_status()
if self._success:
- resp, body = self.service.last_result
+ result = self.service.last_result
_logger.debug("KeepWriterThread %s succeeded %s %s",
str(threading.current_thread()),
self.args['data_hash'],
# we're talking to a proxy or other backend that
# stores to multiple copies for us.
try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
+ replicas_stored = int(result.headers['x-keep-replicas-stored'])
except (KeyError, ValueError):
replicas_stored = 1
- limiter.save_response(body.strip(), replicas_stored)
+ limiter.save_response(result.text.strip(), replicas_stored)
elif status is not None:
_logger.debug("Request fail: PUT %s => %s %s",
self.args['data_hash'], status,
- self.service.last_result[1])
+ self.service.last_result.text)
- def __init__(self, api_client=None, proxy=None, timeout=300,
+ def __init__(self, api_client=None, proxy=None,
+ timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
num_retries=0):
"""Initialize a new KeepClient.
Keep proxy. Otherwise, KeepClient will fall back to the setting
of the ARVADOS_KEEP_PROXY configuration setting. If you want to
ensure KeepClient does not use a proxy, pass in an empty string.
- * timeout: The timeout for all HTTP requests, in seconds. Default
- 300.
+ * timeout: The timeout (in seconds) for HTTP requests to Keep
+ non-proxy servers. A tuple of two floats is interpreted as
+ (connection_timeout, read_timeout): see
+ http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+ Default: (2, 300).
+ * proxy_timeout: The timeout (in seconds) for HTTP requests to
+ Keep proxies. A tuple of two floats is interpreted as
+ (connection_timeout, read_timeout). Default: (20, 300).
* api_token: If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
if proxy is None:
proxy = config.get('ARVADOS_KEEP_PROXY')
if api_token is None:
- api_token = config.get('ARVADOS_API_TOKEN')
+ if api_client is None:
+ api_token = config.get('ARVADOS_API_TOKEN')
+ else:
+ api_token = api_client.api_token
elif api_client is not None:
raise ValueError(
"can't build KeepClient with both API client and token")
local_store = os.environ.get('KEEP_LOCAL_STORE')
self.block_cache = block_cache if block_cache else KeepBlockCache()
+ self.timeout = timeout
+ self.proxy_timeout = proxy_timeout
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
- self.timeout = timeout
self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
- self.service_roots = [proxy]
+ self._keep_services = [{
+ 'uuid': 'proxy',
+ '_service_root': proxy,
+ }]
self.using_proxy = True
- self.static_service_roots = True
+ self._static_services_list = True
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
- self.service_roots = None
+ self._keep_services = None
self.using_proxy = None
- self.static_service_roots = False
+ self._static_services_list = False
+
+ def current_timeout(self):
+ """Return the appropriate timeout to use for this client: the proxy
+ timeout setting if the backend service is currently a proxy,
+ the regular timeout setting otherwise.
+ """
+ # TODO(twp): the timeout should be a property of a
+ # KeepService, not a KeepClient. See #4488.
+ return self.proxy_timeout if self.using_proxy else self.timeout
- def build_service_roots(self, force_rebuild=False):
- if (self.static_service_roots or
- (self.service_roots and not force_rebuild)):
+ def build_services_list(self, force_rebuild=False):
+ if (self._static_services_list or
+ (self._keep_services and not force_rebuild)):
return
with self.lock:
try:
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- keep_services = keep_services.execute().get('items')
- if not keep_services:
+ self._keep_services = keep_services.execute().get('items')
+ if not self._keep_services:
raise arvados.errors.NoKeepServersError()
self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in keep_services)
-
- roots = ("{}://[{}]:{:d}/".format(
- 'https' if ks['service_ssl_flag'] else 'http',
- ks['service_host'],
- ks['service_port'])
- for ks in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
-
- def shuffled_service_roots(self, hash, force_rebuild=False):
- self.build_service_roots(force_rebuild)
-
- # Build an ordering with which to query the Keep servers based on the
- # contents of the hash.
- # "hash" is a hex-encoded number at least 8 digits
- # (32 bits) long
-
- # seed used to calculate the next keep server from 'pool'
- # to be added to 'pseq'
- seed = hash
-
- # Keep servers still to be added to the ordering
- pool = self.service_roots[:]
-
- # output probe sequence
- pseq = []
-
- # iterate while there are servers left to be assigned
- while len(pool) > 0:
- if len(seed) < 8:
- # ran out of digits in the seed
- if len(pseq) < len(hash) / 4:
- # the number of servers added to the probe sequence is less
- # than the number of 4-digit slices in 'hash' so refill the
- # seed with the last 4 digits and then append the contents
- # of 'hash'.
- seed = hash[-4:] + hash
- else:
- # refill the seed with the contents of 'hash'
- seed += hash
-
- # Take the next 8 digits (32 bytes) and interpret as an integer,
- # then modulus with the size of the remaining pool to get the next
- # selected server.
- probe = int(seed[0:8], 16) % len(pool)
-
- # Append the selected server to the probe sequence and remove it
- # from the pool.
- pseq += [pool[probe]]
- pool = pool[:probe] + pool[probe+1:]
-
- # Remove the digits just used from the seed
- seed = seed[8:]
- _logger.debug(str(pseq))
- return pseq
+ for ks in self._keep_services)
+
+ # Precompute the base URI for each service.
+ for r in self._keep_services:
+ r['_service_root'] = "{}://[{}]:{:d}/".format(
+ 'https' if r['service_ssl_flag'] else 'http',
+ r['service_host'],
+ r['service_port'])
+ _logger.debug(str(self._keep_services))
+
+ def _service_weight(self, data_hash, service_uuid):
+ """Compute the weight of a Keep service endpoint for a data
+ block with a known hash.
+
+ The weight is md5(h + u) where u is the last 15 characters of
+ the service endpoint's UUID.
+ """
+ return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+ def weighted_service_roots(self, data_hash, force_rebuild=False):
+ """Return an array of Keep service endpoints, in the order in
+ which they should be probed when reading or writing data with
+ the given hash.
+ """
+ self.build_services_list(force_rebuild)
+
+ # Sort the available services by weight (heaviest first) for
+ # this data_hash, and return their service_roots (base URIs)
+ # in that order.
+ sorted_roots = [
+ svc['_service_root'] for svc in sorted(
+ self._keep_services,
+ reverse=True,
+ key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
+ _logger.debug(data_hash + ': ' + str(sorted_roots))
+ return sorted_roots
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
# new ones to roots_map. Return the current list of local
# root strings.
headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
- local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ local_roots = self.weighted_service_roots(md5_s, force_rebuild)
for root in local_roots:
if root not in roots_map:
roots_map[root] = self.KeepService(root, **headers)
services_to_try = [roots_map[root]
for root in (local_roots + hint_roots)
if roots_map[root].usable()]
- http = httplib2.Http(timeout=self.timeout)
for keep_service in services_to_try:
- blob = keep_service.get(http, locator)
+ blob = keep_service.get(locator, timeout=self.current_timeout())
if blob is not None:
break
loop.save_result((blob, len(services_to_try)))
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.timeout)
+ timeout=self.current_timeout())
t.start()
threads.append(t)
for t in threads: