+import bz2
+import datetime
+import fcntl
+import functools
import gflags
-import httplib
-import httplib2
+import hashlib
+import json
import logging
import os
import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
import re
-import hashlib
+import socket
+import ssl
import string
-import bz2
-import zlib
-import fcntl
-import time
+import StringIO
+import subprocess
+import sys
import threading
+import time
import timer
-import datetime
-import ssl
-
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
+import types
+import UserDict
+import zlib
import arvados
import arvados.config as config
import arvados.errors
+import arvados.retry as retry
import arvados.util
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
self.size = None
for hint in pieces:
if self.HINT_RE.match(hint) is None:
- raise ValueError("unrecognized hint data {}".format(hint))
+ raise ValueError("invalid hint format: {}".format(hint))
elif hint.startswith('A'):
self.parse_permission_hint(hint)
else:
self.permission_hint()] + self.hints
if s is not None)
+ def stripped(self):
+ if self.size is not None:
+ return "%s+%i" % (self.md5sum, self.size)
+ else:
+ return self.md5sum
+
def _make_hex_prop(name, length):
# Build and return a new property with the given name that
# must be a hex string of the given length.
def put(data, **kwargs):
return Keep.global_client_object().put(data, **kwargs)
+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content is None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ with self._cache_lock:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+
+ def _get(self, locator):
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n
+ return None
+
+ def get(self, locator):
+ with self._cache_lock:
+ return self._get(locator)
+
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ with self._cache_lock:
+ n = self._get(locator)
+ if n:
+ return n, False
+ else:
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
class KeepClient(object):
+
+ # Default Keep server connection timeout: 2 seconds
+ # Default Keep server read timeout: 300 seconds
+ # Default Keep proxy connection timeout: 20 seconds
+ # Default Keep proxy read timeout: 300 seconds
+ DEFAULT_TIMEOUT = (2, 300)
+ DEFAULT_PROXY_TIMEOUT = (20, 300)
+
class ThreadLimiter(object):
"""
Limit the number of threads running at a given time to
return self._done
+ class KeepService(object):
+ """Make requests to a single Keep service, and track results.
+
+ A KeepService is intended to last long enough to perform one
+ transaction (GET or PUT) against one Keep service. This can
+ involve calling either get() or put() multiple times in order
+ to retry after transient failures. However, calling both get()
+ and put() on a single instance -- or using the same instance
+ to access two different Keep services -- will not produce
+ sensible behavior.
+ """
+
+ HTTP_ERRORS = (
+ socket.error,
+ ssl.SSLError,
+ arvados.errors.HttpError,
+ )
+
+ def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
+ self.root = root
+ self._user_agent_pool = user_agent_pool
+ self._result = {'error': None}
+ self._usable = True
+ self._session = None
+ self.get_headers = {'Accept': 'application/octet-stream'}
+ self.get_headers.update(headers)
+ self.put_headers = headers
+
+ def usable(self):
+ """Is it worth attempting a request?"""
+ return self._usable
+
+ def finished(self):
+ """Did the request succeed or encounter permanent failure?"""
+ return self._result['error'] == False or not self._usable
+
+ def last_result(self):
+ return self._result
+
+ def _get_user_agent(self):
+ try:
+ return self._user_agent_pool.get(False)
+ except Queue.Empty:
+ return pycurl.Curl()
+
+ def _put_user_agent(self, ua):
+ try:
+ ua.reset()
+ self._user_agent_pool.put(ua, False)
+ except:
+ ua.close()
+
+ def _socket_open(self, family, socktype, protocol, address):
+ """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+ s = socket.socket(family, socktype, protocol)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ return s
+
+ def get(self, locator, timeout=None):
+ # locator is a KeepLocator object.
+ url = self.root + str(locator)
+ _logger.debug("Request: GET %s", url)
+ curl = self._get_user_agent()
+ try:
+ with timer.Timer() as t:
+ self._headers = {}
+ response_body = StringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
+ except self.HTTP_ERRORS as e:
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False
+ if self._result.get('status_code', None):
+ # The client worked well enough to get an HTTP status
+ # code, so presumably any problems are just on the
+ # server side and it's OK to reuse the client.
+ self._put_user_agent(curl)
+ else:
+ # Don't return this client to the pool, in case it's
+ # broken.
+ curl.close()
+ if not ok:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(self._result['error']), str(self._result['error']))
+ return None
+ _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+ self._result['status_code'],
+ len(self._result['body']),
+ t.msecs,
+ (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+ if resp_md5 != locator.md5sum:
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
+ self._result['error'] = arvados.errors.HttpError(
+ 0, 'Checksum fail')
+ return None
+ return self._result['body']
+
+ def put(self, hash_s, body, timeout=None):
+ url = self.root + hash_s
+ _logger.debug("Request: PUT %s", url)
+ curl = self._get_user_agent()
+ try:
+ self._headers = {}
+ response_body = StringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ curl.setopt(pycurl.POSTFIELDS, body)
+ curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
+ except self.HTTP_ERRORS as e:
+ self._result = {
+ 'error': e,
+ }
+ ok = False
+ self._usable = ok != False # still usable if ok is True or None
+ if self._result.get('status_code', None):
+ # Client is functional. See comment in get().
+ self._put_user_agent(curl)
+ else:
+ curl.close()
+ if not ok:
+ _logger.debug("Request fail: PUT %s => %s: %s",
+ url, type(self._result['error']), str(self._result['error']))
+ return False
+ return True
+
+ def _setcurltimeouts(self, curl, timeouts):
+ if not timeouts:
+ return
+ elif isinstance(timeouts, tuple):
+ conn_t, xfer_t = timeouts
+ else:
+ conn_t, xfer_t = (timeouts, timeouts)
+ curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+ curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+
+ def _headerfunction(self, header_line):
+ header_line = header_line.decode('iso-8859-1')
+ if ':' in header_line:
+ name, value = header_line.split(':', 1)
+ name = name.strip().lower()
+ value = value.strip()
+ elif self._headers:
+ name = self._lastheadername
+ value = self._headers[name] + ' ' + header_line.strip()
+ elif header_line.startswith('HTTP/'):
+ name = 'x-status-line'
+ value = header_line
+ else:
+ _logger.error("Unexpected header line: %s", header_line)
+ return
+ self._lastheadername = name
+ self._headers[name] = value
+ # Returning None implies all bytes were written
+
+
class KeepWriterThread(threading.Thread):
"""
Write a blob of data to the given Keep server. On success, call
save_response() of the given ThreadLimiter to save the returned
locator.
"""
- def __init__(self, api_token, **kwargs):
+ def __init__(self, keep_service, **kwargs):
super(KeepClient.KeepWriterThread, self).__init__()
- self._api_token = api_token
+ self.service = keep_service
self.args = kwargs
self._success = False
self.run_with_limiter(limiter)
def run_with_limiter(self, limiter):
- _logger.debug("KeepWriterThread %s proceeding %s %s",
+ if self.service.finished():
+ return
+ _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
str(threading.current_thread()),
self.args['data_hash'],
+ len(self.args['data']),
self.args['service_root'])
- h = httplib2.Http(timeout=self.args.get('timeout', None))
- url = self.args['service_root'] + self.args['data_hash']
- headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-
- if self.args['using_proxy']:
- # We're using a proxy, so tell the proxy how many copies we
- # want it to store
- headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
- try:
- _logger.debug("Uploading to {}".format(url))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=self.args['data'])
- if (resp['status'] == '401' and
- re.match(r'Timestamp verification failed', content)):
- body = KeepClient.sign_for_old_server(
- self.args['data_hash'],
- self.args['data'])
- h = httplib2.Http(timeout=self.args.get('timeout', None))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=body)
- if re.match(r'^2\d\d$', resp['status']):
- self._success = True
- _logger.debug("KeepWriterThread %s succeeded %s %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root'])
+ self._success = bool(self.service.put(
+ self.args['data_hash'],
+ self.args['data'],
+ timeout=self.args.get('timeout', None)))
+ result = self.service.last_result()
+ if self._success:
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ len(self.args['data']),
+ self.args['service_root'])
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
replicas_stored = 1
- if 'x-keep-replicas-stored' in resp:
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
- try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
- except ValueError:
- pass
- limiter.save_response(content.strip(), replicas_stored)
- else:
- _logger.debug("Request fail: PUT %s => %s %s",
- url, resp['status'], content)
- except (httplib2.HttpLib2Error,
- httplib.HTTPException,
- ssl.SSLError) as e:
- # When using https, timeouts look like ssl.SSLError from here.
- # "SSLError: The write operation timed out"
- _logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
-
-
- def __init__(self, api_client=None, proxy=None, timeout=60,
- api_token=None, local_store=None):
+ limiter.save_response(result['body'].strip(), replicas_stored)
+ elif result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.args['data_hash'],
+ result['status_code'],
+ result['body'])
+
+
+ def __init__(self, api_client=None, proxy=None,
+ timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
+ api_token=None, local_store=None, block_cache=None,
+ num_retries=0, session=None):
"""Initialize a new KeepClient.
Arguments:
- * api_client: The API client to use to find Keep services. If not
+ :api_client:
+ The API client to use to find Keep services. If not
provided, KeepClient will build one from available Arvados
configuration.
- * proxy: If specified, this KeepClient will send requests to this
- Keep proxy. Otherwise, KeepClient will fall back to the setting
- of the ARVADOS_KEEP_PROXY configuration setting. If you want to
- ensure KeepClient does not use a proxy, pass in an empty string.
- * timeout: The timeout for all HTTP requests, in seconds. Default
- 60.
- * api_token: If you're not using an API client, but only talking
+
+ :proxy:
+ If specified, this KeepClient will send requests to this Keep
+ proxy. Otherwise, KeepClient will fall back to the setting of the
+ ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
+ KeepClient does not use a proxy, pass in an empty string.
+
+ :timeout:
+ The initial timeout (in seconds) for HTTP requests to Keep
+ non-proxy servers. A tuple of two floats is interpreted as
+ (connection_timeout, read_timeout): see
+ http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+ Because timeouts are often a result of transient server load, the
+ actual connection timeout will be increased by a factor of two on
+ each retry.
+ Default: (2, 300).
+
+ :proxy_timeout:
+ The initial timeout (in seconds) for HTTP requests to
+ Keep proxies. A tuple of two floats is interpreted as
+ (connection_timeout, read_timeout). The behavior described
+ above for adjusting connection timeouts on retry also applies.
+ Default: (20, 300).
+
+ :api_token:
+ If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
api_client and api_token. If you specify neither, KeepClient
will use one available from the Arvados configuration.
- * local_store: If specified, this KeepClient will bypass Keep
+
+ :local_store:
+ If specified, this KeepClient will bypass Keep
services, and save data to the named directory. If unspecified,
KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
+
+ :num_retries:
+ The default number of times to retry failed requests.
+ This will be used as the default num_retries value when get() and
+ put() are called. Default 0.
"""
self.lock = threading.Lock()
if proxy is None:
proxy = config.get('ARVADOS_KEEP_PROXY')
if api_token is None:
- api_token = config.get('ARVADOS_API_TOKEN')
+ if api_client is None:
+ api_token = config.get('ARVADOS_API_TOKEN')
+ else:
+ api_token = api_client.api_token
elif api_client is not None:
raise ValueError(
"can't build KeepClient with both API client and token")
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ self.block_cache = block_cache if block_cache else KeepBlockCache()
+ self.timeout = timeout
+ self.proxy_timeout = proxy_timeout
+ self._user_agent_pool = Queue.LifoQueue()
+
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
- self.timeout = timeout
- self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
- self._cache = []
- self._cache_lock = threading.Lock()
+ self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
- self.service_roots = [proxy]
+ self._gateway_services = {}
+ self._keep_services = [{
+ 'uuid': 'proxy',
+ '_service_root': proxy,
+ }]
self.using_proxy = True
+ self._static_services_list = True
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
- self.service_roots = None
+ self._gateway_services = {}
+ self._keep_services = None
self.using_proxy = None
+ self._static_services_list = False
- def shuffled_service_roots(self, hash):
- if self.service_roots is None:
- with self.lock:
- try:
- keep_services = self.api_client.keep_services().accessible()
- except Exception: # API server predates Keep services.
- keep_services = self.api_client.keep_disks().list()
-
- keep_services = keep_services.execute().get('items')
- if not keep_services:
- raise arvados.errors.NoKeepServersError()
-
- self.using_proxy = (keep_services[0].get('service_type') ==
- 'proxy')
-
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
-
- # Build an ordering with which to query the Keep servers based on the
- # contents of the hash.
- # "hash" is a hex-encoded number at least 8 digits
- # (32 bits) long
-
- # seed used to calculate the next keep server from 'pool'
- # to be added to 'pseq'
- seed = hash
-
- # Keep servers still to be added to the ordering
- pool = self.service_roots[:]
-
- # output probe sequence
- pseq = []
-
- # iterate while there are servers left to be assigned
- while len(pool) > 0:
- if len(seed) < 8:
- # ran out of digits in the seed
- if len(pseq) < len(hash) / 4:
- # the number of servers added to the probe sequence is less
- # than the number of 4-digit slices in 'hash' so refill the
- # seed with the last 4 digits and then append the contents
- # of 'hash'.
- seed = hash[-4:] + hash
- else:
- # refill the seed with the contents of 'hash'
- seed += hash
-
- # Take the next 8 digits (32 bytes) and interpret as an integer,
- # then modulus with the size of the remaining pool to get the next
- # selected server.
- probe = int(seed[0:8], 16) % len(pool)
-
- # Append the selected server to the probe sequence and remove it
- # from the pool.
- pseq += [pool[probe]]
- pool = pool[:probe] + pool[probe+1:]
-
- # Remove the digits just used from the seed
- seed = seed[8:]
- _logger.debug(str(pseq))
- return pseq
+ def current_timeout(self, attempt_number):
+ """Return the appropriate timeout to use for this client.
- class CacheSlot(object):
- def __init__(self, locator):
- self.locator = locator
- self.ready = threading.Event()
- self.content = None
+ The proxy timeout setting if the backend service is currently a proxy,
+ the regular timeout setting otherwise. The `attempt_number` indicates
+ how many times the operation has been tried already (starting from 0
+ for the first try), and scales the connection timeout portion of the
+ return value accordingly.
- def get(self):
- self.ready.wait()
- return self.content
+ """
+ # TODO(twp): the timeout should be a property of a
+ # KeepService, not a KeepClient. See #4488.
+ t = self.proxy_timeout if self.using_proxy else self.timeout
+ return (t[0] * (1 << attempt_number), t[1])
+
+ def build_services_list(self, force_rebuild=False):
+ if (self._static_services_list or
+ (self._keep_services and not force_rebuild)):
+ return
+ with self.lock:
+ try:
+ keep_services = self.api_client.keep_services().accessible()
+ except Exception: # API server predates Keep services.
+ keep_services = self.api_client.keep_disks().list()
+
+ accessible = keep_services.execute().get('items')
+ if not accessible:
+ raise arvados.errors.NoKeepServersError()
+
+ # Precompute the base URI for each service.
+ for r in accessible:
+ host = r['service_host']
+ if not host.startswith('[') and host.find(':') >= 0:
+ # IPv6 URIs must be formatted like http://[::1]:80/...
+ host = '[' + host + ']'
+ r['_service_root'] = "{}://{}:{:d}/".format(
+ 'https' if r['service_ssl_flag'] else 'http',
+ host,
+ r['service_port'])
+
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
+ _logger.debug(str(self._gateway_services))
+
+ self._keep_services = [
+ ks for ks in accessible
+ if ks.get('service_type') in ['disk', 'proxy']]
+ _logger.debug(str(self._keep_services))
+
+ self.using_proxy = any(ks.get('service_type') == 'proxy'
+ for ks in self._keep_services)
+
+ def _service_weight(self, data_hash, service_uuid):
+ """Compute the weight of a Keep service endpoint for a data
+ block with a known hash.
+
+ The weight is md5(h + u) where u is the last 15 characters of
+ the service endpoint's UUID.
+ """
+ return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
- def set(self, value):
- self.content = value
- self.ready.set()
+ def weighted_service_roots(self, locator, force_rebuild=False):
+ """Return an array of Keep service endpoints, in the order in
+ which they should be probed when reading or writing data with
+ the given hash+hints.
+ """
+ self.build_services_list(force_rebuild)
+
+ sorted_roots = []
+
+ # Use the services indicated by the given +K@... remote
+ # service hints, if any are present and can be resolved to a
+ # URI.
+ for hint in locator.hints:
+ if hint.startswith('K@'):
+ if len(hint) == 7:
+ sorted_roots.append(
+ "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+ elif len(hint) == 29:
+ svc = self._gateway_services.get(hint[2:])
+ if svc:
+ sorted_roots.append(svc['_service_root'])
+
+ # Sort the available local services by weight (heaviest first)
+ # for this locator, and return their service_roots (base URIs)
+ # in that order.
+ sorted_roots.extend([
+ svc['_service_root'] for svc in sorted(
+ self._keep_services,
+ reverse=True,
+ key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+ _logger.debug("{}: {}".format(locator, sorted_roots))
+ return sorted_roots
+
+ def map_new_services(self, roots_map, locator, force_rebuild, **headers):
+ # roots_map is a dictionary, mapping Keep service root strings
+ # to KeepService objects. Poll for Keep services, and add any
+ # new ones to roots_map. Return the current list of local
+ # root strings.
+ headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+ local_roots = self.weighted_service_roots(locator, force_rebuild)
+ for root in local_roots:
+ if root not in roots_map:
+ roots_map[root] = self.KeepService(
+ root, self._user_agent_pool, **headers)
+ return local_roots
- def size(self):
- if self.content == None:
- return 0
- else:
- return len(self.content)
+ @staticmethod
+ def _check_loop_result(result):
+ # KeepClient RetryLoops should save results as a 2-tuple: the
+ # actual result of the request, and the number of servers available
+ # to receive the request this round.
+ # This method returns True if there's a real result, False if
+ # there are no more servers available, otherwise None.
+ if isinstance(result, Exception):
+ return None
+ result, tried_server_count = result
+ if (result is not None) and (result is not False):
+ return True
+ elif tried_server_count < 1:
+ _logger.info("No more Keep services to try; giving up")
+ return False
+ else:
+ return None
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
- self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
- sm = sum([slot.size() for slot in self._cache])
- while sm > self.cache_max:
- del self._cache[-1]
- sm = sum([slot.size() for a in self._cache])
- finally:
- self._cache_lock.release()
+ def get_from_cache(self, loc):
+ """Fetch a block only if is in the cache, otherwise return None."""
+ slot = self.block_cache.get(loc)
+ if slot is not None and slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
- def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
- or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
+ @retry.retry_method
+ def get(self, loc_s, num_retries=None):
+ """Get data from Keep.
- # Add a new cache slot for the locator
- n = KeepClient.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
+ This method fetches one or more blocks of data from Keep. It
+ sends a request each Keep service registered with the API
+ server (or the proxy provided when this client was
+ instantiated), then each service named in location hints, in
+ sequence. As soon as one service provides the data, it's
+ returned.
- def get(self, loc_s):
+ Arguments:
+ * loc_s: A string of one or more comma-separated locators to fetch.
+ This method returns the concatenation of these blocks.
+ * num_retries: The number of times to retry GET requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Note that, in each loop, the method may try
+ to fetch data from every available Keep service, along with any
+ that are named in location hints in the locator. The default value
+ is set when the KeepClient is initialized.
+ """
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
- expect_hash = locator.md5sum
-
- slot, first = self.reserve_cache(expect_hash)
-
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
v = slot.get()
return v
- try:
- for service_root in self.shuffled_service_roots(expect_hash):
- url = service_root + loc_s
- headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
- 'Accept': 'application/octet-stream'}
- blob = self.get_url(url, headers, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
-
- for hint in locator.hints:
- if not hint.startswith('K@'):
- continue
- url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
- blob = self.get_url(url, {}, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
- except:
- slot.set(None)
- self.cap_cache()
- raise
-
- slot.set(None)
- self.cap_cache()
- raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
-
- def get_url(self, url, headers, expect_hash):
- h = httplib2.Http()
- try:
- _logger.debug("Request: GET %s", url)
- with timer.Timer() as t:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
- len(content), t.msecs,
- (len(content)/(1024*1024))/t.secs)
- if re.match(r'^2\d\d$', resp['status']):
- md5 = hashlib.md5(content).hexdigest()
- if md5 == expect_hash:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
- except Exception as e:
- _logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- return None
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool)
+ for root in hint_roots
+ }
+
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ sorted_roots = []
+ roots_map = {}
+ blob = None
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries))
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in sorted_roots
+ if roots_map[root].usable()]
+ for keep_service in services_to_try:
+ blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ slot.set(blob)
+ self.block_cache.cap_cache()
+ if loop.success():
+ return blob
+
+ # Q: Including 403 is necessary for the Keep tests to continue
+ # passing, but maybe they should expect KeepReadError instead?
+ not_founds = sum(1 for key in sorted_roots
+ if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+ service_errors = ((key, roots_map[key].last_result()['error'])
+ for key in sorted_roots)
+ if not roots_map:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}: no Keep services available ({})".format(
+ loc_s, loop.last_result()))
+ elif not_founds == len(sorted_roots):
+ raise arvados.errors.NotFoundError(
+ "{} not found".format(loc_s), service_errors)
+ else:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s), service_errors, label="service")
+
+ @retry.retry_method
+ def put(self, data, copies=2, num_retries=None):
+ """Save data in Keep.
+
+ This method will get a list of Keep services from the API server, and
+ send the data to each one simultaneously in a new thread. Once the
+ uploads are finished, if enough copies are saved, this method returns
+ the most recent HTTP response body. If requests fail to upload
+ enough copies, this method raises KeepWriteError.
+
+ Arguments:
+ * data: The string of data to upload.
+ * copies: The number of copies that the user requires be saved.
+ Default 2.
+ * num_retries: The number of times to retry PUT requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. The default value is set when the
+ KeepClient is initialized.
+ """
+
+ if isinstance(data, unicode):
+ data = data.encode("ascii")
+ elif not isinstance(data, str):
+ raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
- def put(self, data, copies=2):
data_hash = hashlib.md5(data).hexdigest()
- have_copies = 0
- want_copies = copies
- if not (want_copies > 0):
+ if copies < 1:
return data_hash
- threads = []
- thread_limiter = KeepClient.ThreadLimiter(want_copies)
- for service_root in self.shuffled_service_roots(data_hash):
- t = KeepClient.KeepWriterThread(
- self.api_token,
- data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- timeout=self.timeout,
- using_proxy=self.using_proxy,
- want_copies=(want_copies if self.using_proxy else 1))
- t.start()
- threads += [t]
- for t in threads:
- t.join()
- if thread_limiter.done() < want_copies:
- # Retry the threads (i.e., services) that failed the first
- # time around.
- threads_retry = []
+ locator = KeepLocator(data_hash + '+' + str(len(data)))
+
+ headers = {}
+ if self.using_proxy:
+ # Tell the proxy how many copies we want it to store
+ headers['X-Keep-Desired-Replication'] = str(copies)
+ roots_map = {}
+ thread_limiter = KeepClient.ThreadLimiter(copies)
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ local_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries), **headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ threads = []
+ for service_root, ks in roots_map.iteritems():
+ if ks.finished():
+ continue
+ t = KeepClient.KeepWriterThread(
+ ks,
+ data=data,
+ data_hash=data_hash,
+ service_root=service_root,
+ thread_limiter=thread_limiter,
+ timeout=self.current_timeout(num_retries-tries_left))
+ t.start()
+ threads.append(t)
for t in threads:
- if not t.success():
- _logger.debug("Retrying: PUT %s %s",
- t.args['service_root'],
- t.args['data_hash'])
- retry_with_args = t.args.copy()
- t_retry = KeepClient.KeepWriterThread(self.api_token,
- **retry_with_args)
- t_retry.start()
- threads_retry += [t_retry]
- for t in threads_retry:
t.join()
- have_copies = thread_limiter.done()
- # If we're done, return the response from Keep
- if have_copies >= want_copies:
+ loop.save_result((thread_limiter.done() >= copies, len(threads)))
+
+ if loop.success():
return thread_limiter.response()
- raise arvados.errors.KeepWriteError(
- "Write fail for %s: wanted %d but wrote %d" %
- (data_hash, want_copies, have_copies))
+ if not roots_map:
+ raise arvados.errors.KeepWriteError(
+ "failed to write {}: no Keep services available ({})".format(
+ data_hash, loop.last_result()))
+ else:
+ service_errors = ((key, roots_map[key].last_result()['error'])
+ for key in local_roots
+ if roots_map[key].last_result()['error'])
+ raise arvados.errors.KeepWriteError(
+ "failed to write {} (wanted {} copies but wrote {})".format(
+ data_hash, copies, thread_limiter.done()), service_errors, label="service")
- @staticmethod
- def sign_for_old_server(data_hash, data):
- return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
+ def local_store_put(self, data, copies=1, num_retries=None):
+ """A stub for put().
- def local_store_put(self, data):
+ This method is used in place of the real put() method when
+ using local storage (see constructor's local_store argument).
+
+ copies and num_retries arguments are ignored: they are here
+ only for the sake of offering the same call signature as
+ put().
+
+ Data stored this way can be retrieved via local_store_get().
+ """
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s):
+ def local_store_get(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
try:
locator = KeepLocator(loc_s)
except ValueError:
return ''
with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
return f.read()
+
+ def is_cached(self, locator):
+ return self.block_cache.reserve_cache(expect_hash)