-import gflags
+import cStringIO
+import collections
+import datetime
+import hashlib
import logging
+import math
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
+import socket
+import ssl
+import sys
import threading
import timer
-import datetime
-import ssl
-import socket
-import requests
+import urlparse
import arvados
import arvados.config as config
_logger = logging.getLogger('arvados.keep')
global_client_object = None
+
+# Monkey patch TCP constants when not available (apple). Values sourced from:
+# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
+if sys.platform == 'darwin':
+ if not hasattr(socket, 'TCP_KEEPALIVE'):
+ socket.TCP_KEEPALIVE = 0x010
+ if not hasattr(socket, 'TCP_KEEPINTVL'):
+ socket.TCP_KEEPINTVL = 0x101
+ if not hasattr(socket, 'TCP_KEEPCNT'):
+ socket.TCP_KEEPCNT = 0x102
+
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
self.size = None
for hint in pieces:
if self.HINT_RE.match(hint) is None:
- raise ValueError("unrecognized hint data {}".format(hint))
+ raise ValueError("invalid hint format: {}".format(hint))
elif hint.startswith('A'):
self.parse_permission_hint(hint)
else:
self.permission_hint()] + self.hints
if s is not None)
+ def stripped(self):
+ if self.size is not None:
+ return "%s+%i" % (self.md5sum, self.size)
+ else:
+ return self.md5sum
+
def _make_hex_prop(name, length):
# Build and return a new property with the given name that
# must be a hex string of the given length.
return getattr(self, data_name)
def setter(self, hex_str):
if not arvados.util.is_hex(hex_str, length):
- raise ValueError("{} must be a {}-digit hex string: {}".
+ raise ValueError("{} is not a {}-digit hex string: {}".
format(name, length, hex_str))
setattr(self, data_name, hex_str)
return property(getter, setter)
self._cache_lock = threading.Lock()
class CacheSlot(object):
+ __slots__ = ("locator", "ready", "content")
+
def __init__(self, locator):
self.locator = locator
self.ready = threading.Event()
def cap_cache(self):
'''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
+ with self._cache_lock:
# Select all slots except those where ready.is_set() and content is
# None (that means there was an error reading the block).
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
del self._cache[i]
break
sm = sum([slot.size() for slot in self._cache])
- finally:
- self._cache_lock.release()
+
+ def _get(self, locator):
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n
+ return None
+
+ def get(self, locator):
+ with self._cache_lock:
+ return self._get(locator)
def reserve_cache(self, locator):
'''Reserve a cache slot for the specified locator,
or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
+ with self._cache_lock:
+ n = self._get(locator)
+ if n:
+ return n, False
+ else:
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+
+class Counter(object):
+ def __init__(self, v=0):
+ self._lk = threading.Lock()
+ self._val = v
+
+ def add(self, v):
+ with self._lk:
+ self._val += v
+
+ def get(self):
+ with self._lk:
+ return self._val
- # Add a new cache slot for the locator
- n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
class KeepClient(object):
# Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 300 seconds
+ # Default Keep server read timeout: 256 seconds
+ # Default Keep server bandwidth minimum: 32768 bytes per second
# Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 300 seconds
- DEFAULT_TIMEOUT = (2, 300)
- DEFAULT_PROXY_TIMEOUT = (20, 300)
+ # Default Keep proxy read timeout: 256 seconds
+ # Default Keep proxy bandwidth minimum: 32768 bytes per second
+ DEFAULT_TIMEOUT = (2, 256, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
- class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
- Should be used in a "with" block.
+ class KeepService(object):
+ """Make requests to a single Keep service, and track results.
+
+ A KeepService is intended to last long enough to perform one
+ transaction (GET or PUT) against one Keep service. This can
+ involve calling either get() or put() multiple times in order
+ to retry after transient failures. However, calling both get()
+ and put() on a single instance -- or using the same instance
+ to access two different Keep services -- will not produce
+ sensible behavior.
"""
- def __init__(self, todo):
- self._todo = todo
- self._done = 0
- self._response = None
- self._todo_lock = threading.Semaphore(todo)
- self._done_lock = threading.Lock()
-
- def __enter__(self):
- self._todo_lock.acquire()
- return self
-
- def __exit__(self, type, value, traceback):
- self._todo_lock.release()
-
- def shall_i_proceed(self):
- """
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
- """
- with self._done_lock:
- return (self._done < self._todo)
-
- def save_response(self, response_body, replicas_stored):
- """
- Records a response body (a locator, possibly signed) returned by
- the Keep server. It is not necessary to save more than
- one response, since we presume that any locator returned
- in response to a successful request is valid.
- """
- with self._done_lock:
- self._done += replicas_stored
- self._response = response_body
-
- def response(self):
- """
- Returns the body from the response to a PUT request.
- """
- with self._done_lock:
- return self._response
-
- def done(self):
- """
- Return how many successes were reported.
- """
- with self._done_lock:
- return self._done
-
- class KeepService(object):
- # Make requests to a single Keep service, and track results.
- HTTP_ERRORS = (requests.exceptions.RequestException,
- socket.error, ssl.SSLError)
+ HTTP_ERRORS = (
+ socket.error,
+ ssl.SSLError,
+ arvados.errors.HttpError,
+ )
- def __init__(self, root, **headers):
+ def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+ upload_counter=None,
+ download_counter=None, **headers):
self.root = root
- self.last_result = None
- self.success_flag = None
+ self._user_agent_pool = user_agent_pool
+ self._result = {'error': None}
+ self._usable = True
+ self._session = None
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
+ self.upload_counter = upload_counter
+ self.download_counter = download_counter
def usable(self):
- return self.success_flag is not False
+ """Is it worth attempting a request?"""
+ return self._usable
def finished(self):
- return self.success_flag is not None
+ """Did the request succeed or encounter permanent failure?"""
+ return self._result['error'] == False or not self._usable
+
+ def last_result(self):
+ return self._result
- def last_status(self):
+ def _get_user_agent(self):
try:
- return self.last_result.status_code
- except AttributeError:
- return None
+ return self._user_agent_pool.get(block=False)
+ except Queue.Empty:
+ return pycurl.Curl()
- def get(self, locator, timeout=None):
+ def _put_user_agent(self, ua):
+ try:
+ ua.reset()
+ self._user_agent_pool.put(ua, block=False)
+ except:
+ ua.close()
+
+ def _socket_open(self, *args, **kwargs):
+ if len(args) + len(kwargs) == 2:
+ return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+ else:
+ return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+ def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+ return self._socket_open_pycurl_7_21_5(
+ purpose=None,
+ address=collections.namedtuple(
+ 'Address', ['family', 'socktype', 'protocol', 'addr'],
+ )(family, socktype, protocol, address))
+
+ def _socket_open_pycurl_7_21_5(self, purpose, address):
+ """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+ s = socket.socket(address.family, address.socktype, address.protocol)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ # Will throw invalid protocol error on mac. This test prevents that.
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ return s
+
+ def get(self, locator, method="GET", timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
- _logger.debug("Request: GET %s", url)
+ _logger.debug("Request: %s %s", method, url)
+ curl = self._get_user_agent()
+ ok = None
try:
with timer.Timer() as t:
- result = requests.get(url.encode('utf-8'),
- headers=self.get_headers,
- timeout=timeout)
+ self._headers = {}
+ response_body = cStringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ if method == "HEAD":
+ curl.setopt(pycurl.NOBODY, True)
+ self._setcurltimeouts(curl, timeout)
+
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
- _logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
+ self._result = {
+ 'error': e,
+ }
+ self._usable = ok != False
+ if self._result.get('status_code', None):
+ # The client worked well enough to get an HTTP status
+ # code, so presumably any problems are just on the
+ # server side and it's OK to reuse the client.
+ self._put_user_agent(curl)
else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- content = result.content
- _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
- self.last_status(), len(content), t.msecs,
- (len(content)/(1024.0*1024))/t.secs)
- if self.success_flag:
- resp_md5 = hashlib.md5(content).hexdigest()
- if resp_md5 == locator.md5sum:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s",
- url, resp_md5)
- return None
+ # Don't return this client to the pool, in case it's
+ # broken.
+ curl.close()
+ if not ok:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(self._result['error']), str(self._result['error']))
+ return None
+ if method == "HEAD":
+ _logger.info("HEAD %s: %s bytes",
+ self._result['status_code'],
+ self._result.get('content-length'))
+ return True
+
+ _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
+ self._result['status_code'],
+ len(self._result['body']),
+ t.msecs,
+ (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+
+ if self.download_counter:
+ self.download_counter.add(len(self._result['body']))
+ resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+ if resp_md5 != locator.md5sum:
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
+ self._result['error'] = arvados.errors.HttpError(
+ 0, 'Checksum fail')
+ return None
+ return self._result['body']
def put(self, hash_s, body, timeout=None):
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
+ curl = self._get_user_agent()
+ ok = None
try:
- result = requests.put(url.encode('utf-8'),
- data=body,
- headers=self.put_headers,
- timeout=timeout)
+ with timer.Timer() as t:
+ self._headers = {}
+ body_reader = cStringIO.StringIO(body)
+ response_body = cStringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ # Using UPLOAD tells cURL to wait for a "go ahead" from the
+ # Keep server (in the form of a HTTP/1.1 "100 Continue"
+ # response) instead of sending the request body immediately.
+ # This allows the server to reject the request if the request
+ # is invalid or the server is read-only, without waiting for
+ # the client to send the entire block.
+ curl.setopt(pycurl.UPLOAD, True)
+ curl.setopt(pycurl.INFILESIZE, len(body))
+ curl.setopt(pycurl.READFUNCTION, body_reader.read)
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
+ ok = retry.check_http_response_success(self._result['status_code'])
+ if not ok:
+ self._result['error'] = arvados.errors.HttpError(
+ self._result['status_code'],
+ self._headers.get('x-status-line', 'Error'))
except self.HTTP_ERRORS as e:
- _logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
- self.last_result = e
+ self._result = {
+ 'error': e,
+ }
+ self._usable = ok != False # still usable if ok is True or None
+ if self._result.get('status_code', None):
+ # Client is functional. See comment in get().
+ self._put_user_agent(curl)
else:
- self.last_result = result
- self.success_flag = retry.check_http_response_success(result)
- return self.success_flag
-
+ curl.close()
+ if not ok:
+ _logger.debug("Request fail: PUT %s => %s: %s",
+ url, type(self._result['error']), str(self._result['error']))
+ return False
+ _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
+ self._result['status_code'],
+ len(body),
+ t.msecs,
+ (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ if self.upload_counter:
+ self.upload_counter.add(len(body))
+ return True
+ def _setcurltimeouts(self, curl, timeouts):
+ if not timeouts:
+ return
+ elif isinstance(timeouts, tuple):
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
+ else:
+ conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+
+ def _headerfunction(self, header_line):
+ header_line = header_line.decode('iso-8859-1')
+ if ':' in header_line:
+ name, value = header_line.split(':', 1)
+ name = name.strip().lower()
+ value = value.strip()
+ elif self._headers:
+ name = self._lastheadername
+ value = self._headers[name] + ' ' + header_line.strip()
+ elif header_line.startswith('HTTP/'):
+ name = 'x-status-line'
+ value = header_line
+ else:
+ _logger.error("Unexpected header line: %s", header_line)
+ return
+ self._lastheadername = name
+ self._headers[name] = value
+ # Returning None implies all bytes were written
+
+
+ class KeepWriterQueue(Queue.Queue):
+ def __init__(self, copies):
+ Queue.Queue.__init__(self) # Old-style superclass
+ self.wanted_copies = copies
+ self.successful_copies = 0
+ self.response = None
+ self.successful_copies_lock = threading.Lock()
+ self.pending_tries = copies
+ self.pending_tries_notification = threading.Condition()
+
+ def write_success(self, response, replicas_nr):
+ with self.successful_copies_lock:
+ self.successful_copies += replicas_nr
+ self.response = response
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
+
+ def write_fail(self, ks):
+ with self.pending_tries_notification:
+ self.pending_tries += 1
+ self.pending_tries_notification.notify()
+
+ def pending_copies(self):
+ with self.successful_copies_lock:
+ return self.wanted_copies - self.successful_copies
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while True:
+ if self.pending_copies() < 1:
+ # This notify_all() is unnecessary --
+ # write_success() already called notify_all()
+ # when pending<1 became true, so it's not
+ # possible for any other thread to be in
+ # wait() now -- but it's cheap insurance
+ # against deadlock so we do it anyway:
+ self.pending_tries_notification.notify_all()
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ elif self.pending_tries > 0:
+ service, service_root = self.get_nowait()
+ if service.finished():
+ self.task_done()
+ continue
+ self.pending_tries -= 1
+ return service, service_root
+ elif self.empty():
+ self.pending_tries_notification.notify_all()
+ raise Queue.Empty
+ else:
+ self.pending_tries_notification.wait()
+
+
+ class KeepWriterThreadPool(object):
+ def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+ self.total_task_nr = 0
+ self.wanted_copies = copies
+ if (not max_service_replicas) or (max_service_replicas >= copies):
+ num_threads = 1
+ else:
+ num_threads = int(math.ceil(float(copies) / max_service_replicas))
+ _logger.debug("Pool max threads is %d", num_threads)
+ self.workers = []
+ self.queue = KeepClient.KeepWriterQueue(copies)
+ # Create workers
+ for _ in range(num_threads):
+ w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
+ self.workers.append(w)
+
+ def add_task(self, ks, service_root):
+ self.queue.put((ks, service_root))
+ self.total_task_nr += 1
+
+ def done(self):
+ return self.queue.successful_copies
+
+ def join(self):
+ # Start workers
+ for worker in self.workers:
+ worker.start()
+ # Wait for finished work
+ self.queue.join()
+
+ def response(self):
+ return self.queue.response
+
+
class KeepWriterThread(threading.Thread):
- """
- Write a blob of data to the given Keep server. On success, call
- save_response() of the given ThreadLimiter to save the returned
- locator.
- """
- def __init__(self, keep_service, **kwargs):
- super(KeepClient.KeepWriterThread, self).__init__()
- self.service = keep_service
- self.args = kwargs
- self._success = False
+ TaskFailed = RuntimeError()
- def success(self):
- return self._success
+ def __init__(self, queue, data, data_hash, timeout=None):
+ super(KeepClient.KeepWriterThread, self).__init__()
+ self.timeout = timeout
+ self.queue = queue
+ self.data = data
+ self.data_hash = data_hash
+ self.daemon = True
def run(self):
- with self.args['thread_limiter'] as limiter:
- if not limiter.shall_i_proceed():
- # My turn arrived, but the job has been done without
- # me.
+ while True:
+ try:
+ service, service_root = self.queue.get_next_task()
+ except Queue.Empty:
return
- self.run_with_limiter(limiter)
-
- def run_with_limiter(self, limiter):
- if self.service.finished():
- return
- _logger.debug("KeepWriterThread %s proceeding %s %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root'])
- self._success = bool(self.service.put(
- self.args['data_hash'],
- self.args['data'],
- timeout=self.args.get('timeout', None)))
- status = self.service.last_status()
- if self._success:
- result = self.service.last_result
- _logger.debug("KeepWriterThread %s succeeded %s %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root'])
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
try:
- replicas_stored = int(result.headers['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
- limiter.save_response(result.text.strip(), replicas_stored)
- elif status is not None:
- _logger.debug("Request fail: PUT %s => %s %s",
- self.args['data_hash'], status,
- self.service.last_result.text)
+ locator, copies = self.do_task(service, service_root)
+ except Exception as e:
+ if e is not self.TaskFailed:
+ _logger.exception("Exception in KeepWriterThread")
+ self.queue.write_fail(service)
+ else:
+ self.queue.write_success(locator, copies)
+ finally:
+ self.queue.task_done()
+
+ def do_task(self, service, service_root):
+ success = bool(service.put(self.data_hash,
+ self.data,
+ timeout=self.timeout))
+ result = service.last_result()
+
+ if not success:
+ if result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.data_hash,
+ result['status_code'],
+ result['body'])
+ raise self.TaskFailed
+
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.data_hash,
+ len(self.data),
+ service_root)
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
+ replicas_stored = 1
+
+ return result['body'].strip(), replicas_stored
def __init__(self, api_client=None, proxy=None,
timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
- num_retries=0):
+ num_retries=0, session=None):
"""Initialize a new KeepClient.
Arguments:
- * api_client: The API client to use to find Keep services. If not
+ :api_client:
+ The API client to use to find Keep services. If not
provided, KeepClient will build one from available Arvados
configuration.
- * proxy: If specified, this KeepClient will send requests to this
- Keep proxy. Otherwise, KeepClient will fall back to the setting
- of the ARVADOS_KEEP_PROXY configuration setting. If you want to
- ensure KeepClient does not use a proxy, pass in an empty string.
- * timeout: The timeout (in seconds) for HTTP requests to Keep
- non-proxy servers. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout): see
- http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
- Default: (2, 300).
- * proxy_timeout: The timeout (in seconds) for HTTP requests to
- Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). Default: (20, 300).
- * api_token: If you're not using an API client, but only talking
+
+ :proxy:
+ If specified, this KeepClient will send requests to this Keep
+ proxy. Otherwise, KeepClient will fall back to the setting of the
+ ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
+ If you want to KeepClient does not use a proxy, pass in an empty
+ string.
+
+ :timeout:
+ The initial timeout (in seconds) for HTTP requests to Keep
+ non-proxy servers. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). A connection
+ will be aborted if the average traffic rate falls below
+ minimum_bandwidth bytes per second over an interval of read_timeout
+ seconds. Because timeouts are often a result of transient server
+ load, the actual connection timeout will be increased by a factor
+ of two on each retry.
+ Default: (2, 256, 32768).
+
+ :proxy_timeout:
+ The initial timeout (in seconds) for HTTP requests to
+ Keep proxies. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+ described above for adjusting connection timeouts on retry also
+ applies.
+ Default: (20, 256, 32768).
+
+ :api_token:
+ If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
api_client and api_token. If you specify neither, KeepClient
will use one available from the Arvados configuration.
- * local_store: If specified, this KeepClient will bypass Keep
+
+ :local_store:
+ If specified, this KeepClient will bypass Keep
services, and save data to the named directory. If unspecified,
KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
- * num_retries: The default number of times to retry failed requests.
+
+ :num_retries:
+ The default number of times to retry failed requests.
This will be used as the default num_retries value when get() and
put() are called. Default 0.
"""
self.lock = threading.Lock()
if proxy is None:
- proxy = config.get('ARVADOS_KEEP_PROXY')
+ if config.get('ARVADOS_KEEP_SERVICES'):
+ proxy = config.get('ARVADOS_KEEP_SERVICES')
+ else:
+ proxy = config.get('ARVADOS_KEEP_PROXY')
if api_token is None:
if api_client is None:
api_token = config.get('ARVADOS_API_TOKEN')
self.block_cache = block_cache if block_cache else KeepBlockCache()
self.timeout = timeout
self.proxy_timeout = proxy_timeout
+ self._user_agent_pool = Queue.LifoQueue()
+ self.upload_counter = Counter()
+ self.download_counter = Counter()
+ self.put_counter = Counter()
+ self.get_counter = Counter()
+ self.hits_counter = Counter()
+ self.misses_counter = Counter()
if local_store:
self.local_store = local_store
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.max_replicas_per_service = None
if proxy:
- if not proxy.endswith('/'):
- proxy += '/'
+ proxy_uris = proxy.split()
+ for i in range(len(proxy_uris)):
+ if not proxy_uris[i].endswith('/'):
+ proxy_uris[i] += '/'
+ # URL validation
+ url = urlparse.urlparse(proxy_uris[i])
+ if not (url.scheme and url.netloc):
+ raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
self.api_token = api_token
+ self._gateway_services = {}
self._keep_services = [{
- 'uuid': 'proxy',
- '_service_root': proxy,
- }]
+ 'uuid': "00000-bi6l4-%015d" % idx,
+ 'service_type': 'proxy',
+ '_service_root': uri,
+ } for idx, uri in enumerate(proxy_uris)]
+ self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
else:
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
+ self._gateway_services = {}
self._keep_services = None
+ self._writable_services = None
self.using_proxy = None
self._static_services_list = False
- def current_timeout(self):
- """Return the appropriate timeout to use for this client: the proxy
- timeout setting if the backend service is currently a proxy,
- the regular timeout setting otherwise.
+ def current_timeout(self, attempt_number):
+ """Return the appropriate timeout to use for this client.
+
+ The proxy timeout setting if the backend service is currently a proxy,
+ the regular timeout setting otherwise. The `attempt_number` indicates
+ how many times the operation has been tried already (starting from 0
+ for the first try), and scales the connection timeout portion of the
+ return value accordingly.
+
"""
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
- return self.proxy_timeout if self.using_proxy else self.timeout
+ t = self.proxy_timeout if self.using_proxy else self.timeout
+ if len(t) == 2:
+ return (t[0] * (1 << attempt_number), t[1])
+ else:
+ return (t[0] * (1 << attempt_number), t[1], t[2])
+ def _any_nondisk_services(self, service_list):
+ return any(ks.get('service_type', 'disk') != 'disk'
+ for ks in service_list)
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- self._keep_services = keep_services.execute().get('items')
- if not self._keep_services:
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks['uuid']: ks for ks in
+ keep_services.execute()['items']}
+ if not self._gateway_services:
raise arvados.errors.NoKeepServersError()
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
-
# Precompute the base URI for each service.
- for r in self._keep_services:
- r['_service_root'] = "{}://[{}]:{:d}/".format(
+ for r in self._gateway_services.itervalues():
+ host = r['service_host']
+ if not host.startswith('[') and host.find(':') >= 0:
+ # IPv6 URIs must be formatted like http://[::1]:80/...
+ host = '[' + host + ']'
+ r['_service_root'] = "{}://{}:{:d}/".format(
'https' if r['service_ssl_flag'] else 'http',
- r['service_host'],
+ host,
r['service_port'])
- _logger.debug(str(self._keep_services))
+
+ _logger.debug(str(self._gateway_services))
+ self._keep_services = [
+ ks for ks in self._gateway_services.itervalues()
+ if not ks.get('service_type', '').startswith('gateway:')]
+ self._writable_services = [ks for ks in self._keep_services
+ if not ks.get('read_only')]
+
+ # For disk type services, max_replicas_per_service is 1
+ # It is unknown (unlimited) for other service types.
+ if self._any_nondisk_services(self._writable_services):
+ self.max_replicas_per_service = None
+ else:
+ self.max_replicas_per_service = 1
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
"""
return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
- def weighted_service_roots(self, data_hash, force_rebuild=False):
+ def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
"""Return an array of Keep service endpoints, in the order in
which they should be probed when reading or writing data with
- the given hash.
+ the given hash+hints.
"""
self.build_services_list(force_rebuild)
- # Sort the available services by weight (heaviest first) for
- # this data_hash, and return their service_roots (base URIs)
+ sorted_roots = []
+ # Use the services indicated by the given +K@... remote
+ # service hints, if any are present and can be resolved to a
+ # URI.
+ for hint in locator.hints:
+ if hint.startswith('K@'):
+ if len(hint) == 7:
+ sorted_roots.append(
+ "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+ elif len(hint) == 29:
+ svc = self._gateway_services.get(hint[2:])
+ if svc:
+ sorted_roots.append(svc['_service_root'])
+
+ # Sort the available local services by weight (heaviest first)
+ # for this locator, and return their service_roots (base URIs)
# in that order.
- sorted_roots = [
+ use_services = self._keep_services
+ if need_writable:
+ use_services = self._writable_services
+ self.using_proxy = self._any_nondisk_services(use_services)
+ sorted_roots.extend([
svc['_service_root'] for svc in sorted(
- self._keep_services,
+ use_services,
reverse=True,
- key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
- _logger.debug(data_hash + ': ' + str(sorted_roots))
+ key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+ _logger.debug("{}: {}".format(locator, sorted_roots))
return sorted_roots
- def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
# roots_map is a dictionary, mapping Keep service root strings
# to KeepService objects. Poll for Keep services, and add any
# new ones to roots_map. Return the current list of local
# root strings.
headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
- local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+ local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
for root in local_roots:
if root not in roots_map:
- roots_map[root] = self.KeepService(root, **headers)
+ roots_map[root] = self.KeepService(
+ root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter,
+ **headers)
return local_roots
@staticmethod
else:
return None
+ def get_from_cache(self, loc):
+ """Fetch a block only if is in the cache, otherwise return None."""
+ slot = self.block_cache.get(loc)
+ if slot is not None and slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
+
+ @retry.retry_method
+ def head(self, loc_s, num_retries=None):
+ return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+
@retry.retry_method
def get(self, loc_s, num_retries=None):
+ return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+
+ def _get_or_head(self, loc_s, method="GET", num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
- locator = KeepLocator(loc_s)
- expect_hash = locator.md5sum
- slot, first = self.block_cache.reserve_cache(expect_hash)
- if not first:
- v = slot.get()
- return v
+ self.get_counter.add(1)
+
+ locator = KeepLocator(loc_s)
+ if method == "GET":
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
+ if not first:
+ self.hits_counter.add(1)
+ v = slot.get()
+ return v
+
+ self.misses_counter.add(1)
+
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter)
+ for root in hint_roots
+ }
# See #3147 for a discussion of the loop implementation. Highlights:
# * Refresh the list of Keep services after each failure, in case
# it's being updated.
# * Retry until we succeed, we're out of retries, or every available
# service has returned permanent failure.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@')]
- # Map root URLs their KeepService objects.
- roots_map = {root: self.KeepService(root) for root in hint_roots}
+ sorted_roots = []
+ roots_map = {}
blob = None
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
for tries_left in loop:
try:
- local_roots = self.map_new_services(
- roots_map, expect_hash,
- force_rebuild=(tries_left < num_retries))
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False)
except Exception as error:
loop.save_result(error)
continue
# Query KeepService objects that haven't returned
# permanent failure, in our specified shuffle order.
services_to_try = [roots_map[root]
- for root in (local_roots + hint_roots)
+ for root in sorted_roots
if roots_map[root].usable()]
for keep_service in services_to_try:
- blob = keep_service.get(locator, timeout=self.current_timeout())
+ blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
if blob is not None:
break
loop.save_result((blob, len(services_to_try)))
# Always cache the result, then return it if we succeeded.
- slot.set(blob)
- self.block_cache.cap_cache()
+ if method == "GET":
+ slot.set(blob)
+ self.block_cache.cap_cache()
if loop.success():
- return blob
+ if method == "HEAD":
+ return True
+ else:
+ return blob
- try:
- all_roots = local_roots + hint_roots
- except NameError:
- # We never successfully fetched local_roots.
- all_roots = hint_roots
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
- not_founds = sum(1 for key in all_roots
- if roots_map[key].last_status() in {403, 404, 410})
- service_errors = ((key, roots_map[key].last_result)
- for key in all_roots)
+ not_founds = sum(1 for key in sorted_roots
+ if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+ service_errors = ((key, roots_map[key].last_result()['error'])
+ for key in sorted_roots)
if not roots_map:
raise arvados.errors.KeepReadError(
"failed to read {}: no Keep services available ({})".format(
loc_s, loop.last_result()))
- elif not_founds == len(all_roots):
+ elif not_founds == len(sorted_roots):
raise arvados.errors.NotFoundError(
"{} not found".format(loc_s), service_errors)
else:
raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s), service_errors)
+ "failed to read {}".format(loc_s), service_errors, label="service")
@retry.retry_method
def put(self, data, copies=2, num_retries=None):
exponential backoff. The default value is set when the
KeepClient is initialized.
"""
+
+ if isinstance(data, unicode):
+ data = data.encode("ascii")
+ elif not isinstance(data, str):
+ raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+
+ self.put_counter.add(1)
+
data_hash = hashlib.md5(data).hexdigest()
+ loc_s = data_hash + '+' + str(len(data))
if copies < 1:
- return data_hash
+ return loc_s
+ locator = KeepLocator(loc_s)
headers = {}
- if self.using_proxy:
- # Tell the proxy how many copies we want it to store
- headers['X-Keep-Desired-Replication'] = str(copies)
+ # Tell the proxy how many copies we want it to store
+ headers['X-Keep-Desired-Replicas'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
+ done = 0
for tries_left in loop:
try:
- local_roots = self.map_new_services(
- roots_map, data_hash,
- force_rebuild=(tries_left < num_retries), **headers)
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
except Exception as error:
loop.save_result(error)
continue
- threads = []
- for service_root, ks in roots_map.iteritems():
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ data_hash=data_hash,
+ copies=copies - done,
+ max_service_replicas=self.max_replicas_per_service,
+ timeout=self.current_timeout(num_retries - tries_left))
+ for service_root, ks in [(root, roots_map[root])
+ for root in sorted_roots]:
if ks.finished():
continue
- t = KeepClient.KeepWriterThread(
- ks,
- data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- timeout=self.current_timeout())
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- loop.save_result((thread_limiter.done() >= copies, len(threads)))
+ writer_pool.add_task(ks, service_root)
+ writer_pool.join()
+ done += writer_pool.done()
+ loop.save_result((done >= copies, writer_pool.total_task_nr))
if loop.success():
- return thread_limiter.response()
+ return writer_pool.response()
if not roots_map:
raise arvados.errors.KeepWriteError(
"failed to write {}: no Keep services available ({})".format(
data_hash, loop.last_result()))
else:
- service_errors = ((key, roots_map[key].last_result)
- for key in local_roots
- if not roots_map[key].success_flag)
+ service_errors = ((key, roots_map[key].last_result()['error'])
+ for key in sorted_roots
+ if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
- data_hash, copies, thread_limiter.done()), service_errors)
+ data_hash, copies, writer_pool.done()), service_errors, label="service")
+
+ def local_store_put(self, data, copies=1, num_retries=None):
+ """A stub for put().
+
+ This method is used in place of the real put() method when
+ using local storage (see constructor's local_store argument).
- # Local storage methods need no-op num_retries arguments to keep
- # integration tests happy. With better isolation they could
- # probably be removed again.
- def local_store_put(self, data, num_retries=0):
+ copies and num_retries arguments are ignored: they are here
+ only for the sake of offering the same call signature as
+ put().
+
+ Data stored this way can be retrieved via local_store_get().
+ """
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s, num_retries=0):
+ def local_store_get(self, loc_s, num_retries=None):
+ """Companion to local_store_put()."""
try:
locator = KeepLocator(loc_s)
except ValueError:
return ''
with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
return f.read()
+
+ def is_cached(self, locator):
+ return self.block_cache.reserve_cache(expect_hash)