-import bz2
+import cStringIO
import datetime
-import fcntl
-import functools
-import gflags
import hashlib
-import json
import logging
+import math
import os
-import pprint
import pycurl
import Queue
import re
import socket
import ssl
-import string
-import cStringIO
-import subprocess
-import sys
import threading
-import time
import timer
-import types
-import UserDict
-import zlib
import arvados
import arvados.config as config
self._cache_lock = threading.Lock()
class CacheSlot(object):
+ __slots__ = ("locator", "ready", "content")
+
def __init__(self, locator):
self.locator = locator
self.ready = threading.Event()
self._cache.insert(0, n)
return n, True
+
+class Counter(object):
+ def __init__(self, v=0):
+ self._lk = threading.Lock()
+ self._val = v
+
+ def add(self, v):
+ with self._lk:
+ self._val += v
+
+ def get(self):
+ with self._lk:
+ return self._val
+
+
class KeepClient(object):
# Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 300 seconds
+ # Default Keep server read timeout: 256 seconds
+ # Default Keep server bandwidth minimum: 32768 bytes per second
# Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 300 seconds
- DEFAULT_TIMEOUT = (2, 300)
- DEFAULT_PROXY_TIMEOUT = (20, 300)
+ # Default Keep proxy read timeout: 256 seconds
+ # Default Keep proxy bandwidth minimum: 32768 bytes per second
+ DEFAULT_TIMEOUT = (2, 256, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
+ """Limit the number of threads writing to Keep at once.
+
+ This ensures that only a number of writer threads that could
+ potentially achieve the desired replication level run at once.
+ Once the desired replication level is achieved, queued threads
+ are instructed not to run.
Should be used in a "with" block.
"""
- def __init__(self, todo):
- self._todo = todo
+ def __init__(self, want_copies, max_service_replicas):
+ self._started = 0
+ self._want_copies = want_copies
self._done = 0
self._response = None
- self._todo_lock = threading.Semaphore(todo)
+ self._start_lock = threading.Condition()
+ if (not max_service_replicas) or (max_service_replicas >= want_copies):
+ max_threads = 1
+ else:
+ max_threads = math.ceil(float(want_copies) / max_service_replicas)
+ _logger.debug("Limiter max threads is %d", max_threads)
+ self._todo_lock = threading.Semaphore(max_threads)
self._done_lock = threading.Lock()
+ self._local = threading.local()
def __enter__(self):
+ self._start_lock.acquire()
+ if getattr(self._local, 'sequence', None) is not None:
+ # If the calling thread has used set_sequence(N), then
+ # we wait here until N other threads have started.
+ while self._started < self._local.sequence:
+ self._start_lock.wait()
self._todo_lock.acquire()
+ self._started += 1
+ self._start_lock.notifyAll()
+ self._start_lock.release()
return self
def __exit__(self, type, value, traceback):
self._todo_lock.release()
+ def set_sequence(self, sequence):
+ self._local.sequence = sequence
+
def shall_i_proceed(self):
"""
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
+ Return true if the current thread should write to Keep.
+ Return false otherwise.
"""
with self._done_lock:
- return (self._done < self._todo)
+ return (self._done < self._want_copies)
def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
- the Keep server. It is not necessary to save more than
- one response, since we presume that any locator returned
- in response to a successful request is valid.
+ the Keep server, and the number of replicas it stored.
"""
with self._done_lock:
self._done += replicas_stored
self._response = response_body
def response(self):
- """
- Returns the body from the response to a PUT request.
- """
+ """Return the body from the response to a PUT request."""
with self._done_lock:
return self._response
def done(self):
- """
- Return how many successes were reported.
- """
+ """Return the total number of replicas successfully stored."""
with self._done_lock:
return self._done
-
class KeepService(object):
"""Make requests to a single Keep service, and track results.
arvados.errors.HttpError,
)
- def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
+ def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+ upload_counter=None,
+ download_counter=None, **headers):
self.root = root
self._user_agent_pool = user_agent_pool
self._result = {'error': None}
self.get_headers = {'Accept': 'application/octet-stream'}
self.get_headers.update(headers)
self.put_headers = headers
+ self.upload_counter = upload_counter
+ self.download_counter = download_counter
def usable(self):
"""Is it worth attempting a request?"""
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
curl = self._get_user_agent()
+ ok = None
try:
with timer.Timer() as t:
self._headers = {}
self._result = {
'error': e,
}
- ok = False
self._usable = ok != False
if self._result.get('status_code', None):
# The client worked well enough to get an HTTP status
_logger.debug("Request fail: GET %s => %s: %s",
url, type(self._result['error']), str(self._result['error']))
return None
- _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+ _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
self._result['status_code'],
len(self._result['body']),
t.msecs,
(len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ if self.download_counter:
+ self.download_counter.add(len(self._result['body']))
resp_md5 = hashlib.md5(self._result['body']).hexdigest()
if resp_md5 != locator.md5sum:
_logger.warning("Checksum fail: md5(%s) = %s",
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
curl = self._get_user_agent()
+ ok = None
try:
- self._headers = {}
- body_reader = cStringIO.StringIO(body)
- response_body = cStringIO.StringIO()
- curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
- curl.setopt(pycurl.URL, url.encode('utf-8'))
- # Using UPLOAD tells cURL to wait for a "go ahead" from the
- # Keep server (in the form of a HTTP/1.1 "100 Continue"
- # response) instead of sending the request body immediately.
- # This allows the server to reject the request if the request
- # is invalid or the server is read-only, without waiting for
- # the client to send the entire block.
- curl.setopt(pycurl.UPLOAD, True)
- curl.setopt(pycurl.INFILESIZE, len(body))
- curl.setopt(pycurl.READFUNCTION, body_reader.read)
- curl.setopt(pycurl.HTTPHEADER, [
- '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
- curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
- curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
- self._setcurltimeouts(curl, timeout)
- try:
- curl.perform()
- except Exception as e:
- raise arvados.errors.HttpError(0, str(e))
- self._result = {
- 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
- 'body': response_body.getvalue(),
- 'headers': self._headers,
- 'error': False,
- }
+ with timer.Timer() as t:
+ self._headers = {}
+ body_reader = cStringIO.StringIO(body)
+ response_body = cStringIO.StringIO()
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+ curl.setopt(pycurl.URL, url.encode('utf-8'))
+ # Using UPLOAD tells cURL to wait for a "go ahead" from the
+ # Keep server (in the form of a HTTP/1.1 "100 Continue"
+ # response) instead of sending the request body immediately.
+ # This allows the server to reject the request if the request
+ # is invalid or the server is read-only, without waiting for
+ # the client to send the entire block.
+ curl.setopt(pycurl.UPLOAD, True)
+ curl.setopt(pycurl.INFILESIZE, len(body))
+ curl.setopt(pycurl.READFUNCTION, body_reader.read)
+ curl.setopt(pycurl.HTTPHEADER, [
+ '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+ curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+ self._setcurltimeouts(curl, timeout)
+ try:
+ curl.perform()
+ except Exception as e:
+ raise arvados.errors.HttpError(0, str(e))
+ self._result = {
+ 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+ 'body': response_body.getvalue(),
+ 'headers': self._headers,
+ 'error': False,
+ }
ok = retry.check_http_response_success(self._result['status_code'])
if not ok:
self._result['error'] = arvados.errors.HttpError(
self._result = {
'error': e,
}
- ok = False
self._usable = ok != False # still usable if ok is True or None
if self._result.get('status_code', None):
# Client is functional. See comment in get().
_logger.debug("Request fail: PUT %s => %s: %s",
url, type(self._result['error']), str(self._result['error']))
return False
+ _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
+ self._result['status_code'],
+ len(body),
+ t.msecs,
+ (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+ if self.upload_counter:
+ self.upload_counter.add(len(body))
return True
def _setcurltimeouts(self, curl, timeouts):
if not timeouts:
return
elif isinstance(timeouts, tuple):
- conn_t, xfer_t = timeouts
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
else:
conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
header_line = header_line.decode('iso-8859-1')
return self._success
def run(self):
- with self.args['thread_limiter'] as limiter:
+ limiter = self.args['thread_limiter']
+ sequence = self.args['thread_sequence']
+ if sequence is not None:
+ limiter.set_sequence(sequence)
+ with limiter:
if not limiter.shall_i_proceed():
# My turn arrived, but the job has been done without
# me.
:timeout:
The initial timeout (in seconds) for HTTP requests to Keep
- non-proxy servers. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout): see
- http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
- Because timeouts are often a result of transient server load, the
- actual connection timeout will be increased by a factor of two on
- each retry.
- Default: (2, 300).
+ non-proxy servers. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). A connection
+ will be aborted if the average traffic rate falls below
+ minimum_bandwidth bytes per second over an interval of read_timeout
+ seconds. Because timeouts are often a result of transient server
+ load, the actual connection timeout will be increased by a factor
+ of two on each retry.
+ Default: (2, 256, 32768).
:proxy_timeout:
The initial timeout (in seconds) for HTTP requests to
- Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). The behavior described
- above for adjusting connection timeouts on retry also applies.
- Default: (20, 300).
+ Keep proxies. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+ described above for adjusting connection timeouts on retry also
+ applies.
+ Default: (20, 256, 32768).
:api_token:
If you're not using an API client, but only talking
self.timeout = timeout
self.proxy_timeout = proxy_timeout
self._user_agent_pool = Queue.LifoQueue()
+ self.upload_counter = Counter()
+ self.download_counter = Counter()
+ self.put_counter = Counter()
+ self.get_counter = Counter()
+ self.hits_counter = Counter()
+ self.misses_counter = Counter()
if local_store:
self.local_store = local_store
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.max_replicas_per_service = None
if proxy:
if not proxy.endswith('/'):
proxy += '/'
self._gateway_services = {}
self._keep_services = [{
'uuid': 'proxy',
+ 'service_type': 'proxy',
'_service_root': proxy,
}]
self._writable_services = self._keep_services
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
t = self.proxy_timeout if self.using_proxy else self.timeout
- return (t[0] * (1 << attempt_number), t[1])
+ if len(t) == 2:
+ return (t[0] * (1 << attempt_number), t[1])
+ else:
+ return (t[0] * (1 << attempt_number), t[1], t[2])
+ def _any_nondisk_services(self, service_list):
+ return any(ks.get('service_type', 'disk') != 'disk'
+ for ks in service_list)
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- accessible = keep_services.execute().get('items')
- if not accessible:
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks['uuid']: ks for ks in
+ keep_services.execute()['items']}
+ if not self._gateway_services:
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in accessible:
+ for r in self._gateway_services.itervalues():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
host,
r['service_port'])
- # Gateway services are only used when specified by UUID,
- # so there's nothing to gain by filtering them by
- # service_type.
- self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
_logger.debug(str(self._gateway_services))
-
self._keep_services = [
- ks for ks in accessible
- if ks.get('service_type') in ['disk', 'proxy']]
- self._writable_services = [
- ks for ks in accessible
- if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
- _logger.debug(str(self._keep_services))
-
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
+ ks for ks in self._gateway_services.itervalues()
+ if not ks.get('service_type', '').startswith('gateway:')]
+ self._writable_services = [ks for ks in self._keep_services
+ if not ks.get('read_only')]
+
+ # For disk type services, max_replicas_per_service is 1
+ # It is unknown (unlimited) for other service types.
+ if self._any_nondisk_services(self._writable_services):
+ self.max_replicas_per_service = None
+ else:
+ self.max_replicas_per_service = 1
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
self.build_services_list(force_rebuild)
sorted_roots = []
-
# Use the services indicated by the given +K@... remote
# service hints, if any are present and can be resolved to a
# URI.
# in that order.
use_services = self._keep_services
if need_writable:
- use_services = self._writable_services
+ use_services = self._writable_services
+ self.using_proxy = self._any_nondisk_services(use_services)
sorted_roots.extend([
svc['_service_root'] for svc in sorted(
use_services,
for root in local_roots:
if root not in roots_map:
roots_map[root] = self.KeepService(
- root, self._user_agent_pool, **headers)
+ root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter,
+ **headers)
return local_roots
@staticmethod
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
+
+ self.get_counter.add(1)
+
locator = KeepLocator(loc_s)
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
+ self.hits_counter.add(1)
v = slot.get()
return v
+ self.misses_counter.add(1)
+
# If the locator has hints specifying a prefix (indicating a
# remote keepproxy) or the UUID of a local gateway service,
# read data from the indicated service(s) instead of the usual
)])
# Map root URLs to their KeepService objects.
roots_map = {
- root: self.KeepService(root, self._user_agent_pool)
+ root: self.KeepService(root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter)
for root in hint_roots
}
elif not isinstance(data, str):
raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+ self.put_counter.add(1)
+
data_hash = hashlib.md5(data).hexdigest()
loc_s = data_hash + '+' + str(len(data))
if copies < 1:
locator = KeepLocator(loc_s)
headers = {}
- if self.using_proxy:
- # Tell the proxy how many copies we want it to store
- headers['X-Keep-Desired-Replication'] = str(copies)
+ # Tell the proxy how many copies we want it to store
+ headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
+ done = 0
for tries_left in loop:
try:
- local_roots = self.map_new_services(
+ sorted_roots = self.map_new_services(
roots_map, locator,
force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
except Exception as error:
loop.save_result(error)
continue
+ thread_limiter = KeepClient.ThreadLimiter(
+ copies - done, self.max_replicas_per_service)
threads = []
- for service_root, ks in roots_map.iteritems():
+ for service_root, ks in [(root, roots_map[root])
+ for root in sorted_roots]:
if ks.finished():
continue
t = KeepClient.KeepWriterThread(
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.current_timeout(num_retries-tries_left))
+ timeout=self.current_timeout(num_retries-tries_left),
+ thread_sequence=len(threads))
t.start()
threads.append(t)
for t in threads:
t.join()
- loop.save_result((thread_limiter.done() >= copies, len(threads)))
+ done += thread_limiter.done()
+ loop.save_result((done >= copies, len(threads)))
if loop.success():
return thread_limiter.response()
data_hash, loop.last_result()))
else:
service_errors = ((key, roots_map[key].last_result()['error'])
- for key in local_roots
+ for key in sorted_roots
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(