self._cache_lock = threading.Lock()
class CacheSlot(object):
+ __slots__ = ("locator", "ready", "content")
+
def __init__(self, locator):
self.locator = locator
self.ready = threading.Event()
class KeepClient(object):
# Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 300 seconds
+ # Default Keep server read timeout: 256 seconds
+ # Default Keep server bandwidth minimum: 32768 bytes per second
# Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 300 seconds
- DEFAULT_TIMEOUT = (2, 300)
- DEFAULT_PROXY_TIMEOUT = (20, 300)
+ # Default Keep proxy read timeout: 256 seconds
+ # Default Keep proxy bandwidth minimum: 32768 bytes per second
+ DEFAULT_TIMEOUT = (2, 256, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
class ThreadLimiter(object):
"""Limit the number of threads writing to Keep at once.
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
curl = self._get_user_agent()
+ ok = None
try:
with timer.Timer() as t:
self._headers = {}
self._result = {
'error': e,
}
- ok = False
self._usable = ok != False
if self._result.get('status_code', None):
# The client worked well enough to get an HTTP status
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
curl = self._get_user_agent()
+ ok = None
try:
with timer.Timer() as t:
self._headers = {}
self._result = {
'error': e,
}
- ok = False
self._usable = ok != False # still usable if ok is True or None
if self._result.get('status_code', None):
# Client is functional. See comment in get().
if not timeouts:
return
elif isinstance(timeouts, tuple):
- conn_t, xfer_t = timeouts
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
else:
conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
header_line = header_line.decode('iso-8859-1')
:timeout:
The initial timeout (in seconds) for HTTP requests to Keep
- non-proxy servers. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout): see
- http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
- Because timeouts are often a result of transient server load, the
- actual connection timeout will be increased by a factor of two on
- each retry.
- Default: (2, 300).
+ non-proxy servers. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). A connection
+ will be aborted if the average traffic rate falls below
+ minimum_bandwidth bytes per second over an interval of read_timeout
+ seconds. Because timeouts are often a result of transient server
+ load, the actual connection timeout will be increased by a factor
+ of two on each retry.
+ Default: (2, 256, 32768).
:proxy_timeout:
The initial timeout (in seconds) for HTTP requests to
- Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). The behavior described
- above for adjusting connection timeouts on retry also applies.
- Default: (20, 300).
+ Keep proxies. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+ described above for adjusting connection timeouts on retry also
+ applies.
+ Default: (20, 256, 32768).
:api_token:
If you're not using an API client, but only talking
self.download_counter = Counter()
self.put_counter = Counter()
self.get_counter = Counter()
+ self.hits_counter = Counter()
+ self.misses_counter = Counter()
if local_store:
self.local_store = local_store
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
t = self.proxy_timeout if self.using_proxy else self.timeout
- return (t[0] * (1 << attempt_number), t[1])
-
+ if len(t) == 2:
+ return (t[0] * (1 << attempt_number), t[1])
+ else:
+ return (t[0] * (1 << attempt_number), t[1], t[2])
def _any_nondisk_services(self, service_list):
return any(ks.get('service_type', 'disk') != 'disk'
for ks in service_list)
locator = KeepLocator(loc_s)
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
+ self.hits_counter.add(1)
v = slot.get()
return v
+ self.misses_counter.add(1)
+
# If the locator has hints specifying a prefix (indicating a
# remote keepproxy) or the UUID of a local gateway service,
# read data from the indicated service(s) instead of the usual
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
+ done = 0
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
continue
thread_limiter = KeepClient.ThreadLimiter(
- copies, self.max_replicas_per_service)
+ copies - done, self.max_replicas_per_service)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
threads.append(t)
for t in threads:
t.join()
- loop.save_result((thread_limiter.done() >= copies, len(threads)))
+ done += thread_limiter.done()
+ loop.save_result((done >= copies, len(threads)))
if loop.success():
return thread_limiter.response()