self._cache_lock = threading.Lock()
class CacheSlot(object):
+ __slots__ = ("locator", "ready", "content")
+
def __init__(self, locator):
self.locator = locator
self.ready = threading.Event()
class KeepClient(object):
# Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 64 seconds
+ # Default Keep server read timeout: 256 seconds
# Default Keep server bandwidth minimum: 32768 bytes per second
# Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 64 seconds
+ # Default Keep proxy read timeout: 256 seconds
# Default Keep proxy bandwidth minimum: 32768 bytes per second
- DEFAULT_TIMEOUT = (2, 64, 32768)
- DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
+ DEFAULT_TIMEOUT = (2, 256, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
class ThreadLimiter(object):
"""Limit the number of threads writing to Keep at once.
url = self.root + str(locator)
_logger.debug("Request: GET %s", url)
curl = self._get_user_agent()
+ ok = None
try:
with timer.Timer() as t:
self._headers = {}
self._result = {
'error': e,
}
- ok = False
self._usable = ok != False
if self._result.get('status_code', None):
# The client worked well enough to get an HTTP status
url = self.root + hash_s
_logger.debug("Request: PUT %s", url)
curl = self._get_user_agent()
+ ok = None
try:
with timer.Timer() as t:
self._headers = {}
self._result = {
'error': e,
}
- ok = False
self._usable = ok != False # still usable if ok is True or None
if self._result.get('status_code', None):
# Client is functional. See comment in get().
seconds. Because timeouts are often a result of transient server
load, the actual connection timeout will be increased by a factor
of two on each retry.
- Default: (2, 64, 32768).
+ Default: (2, 256, 32768).
:proxy_timeout:
The initial timeout (in seconds) for HTTP requests to
(connection_timeout, read_timeout, minimum_bandwidth). The behavior
described above for adjusting connection timeouts on retry also
applies.
- Default: (20, 64, 32768).
+ Default: (20, 256, 32768).
:api_token:
If you're not using an API client, but only talking
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
+ done = 0
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
continue
thread_limiter = KeepClient.ThreadLimiter(
- copies, self.max_replicas_per_service)
+ copies - done, self.max_replicas_per_service)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
threads.append(t)
for t in threads:
t.join()
- loop.save_result((thread_limiter.done() >= copies, len(threads)))
+ done += thread_limiter.done()
+ loop.save_result((done >= copies, len(threads)))
if loop.success():
return thread_limiter.response()