X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f34c10a5b22e3f9941f2cfe60f6f9d7a81319b6f..787fdb3943a9189486fc0ad95a460180a2469e31:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 0d5eeb37fe..cd39f83703 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -148,6 +148,8 @@ class KeepBlockCache(object): self._cache_lock = threading.Lock() class CacheSlot(object): + __slots__ = ("locator", "ready", "content") + def __init__(self, locator): self.locator = locator self.ready = threading.Event() @@ -228,11 +230,13 @@ class Counter(object): class KeepClient(object): # Default Keep server connection timeout: 2 seconds - # Default Keep server read timeout: 300 seconds + # Default Keep server read timeout: 256 seconds + # Default Keep server bandwidth minimum: 32768 bytes per second # Default Keep proxy connection timeout: 20 seconds - # Default Keep proxy read timeout: 300 seconds - DEFAULT_TIMEOUT = (2, 300) - DEFAULT_PROXY_TIMEOUT = (20, 300) + # Default Keep proxy read timeout: 256 seconds + # Default Keep proxy bandwidth minimum: 32768 bytes per second + DEFAULT_TIMEOUT = (2, 256, 32768) + DEFAULT_PROXY_TIMEOUT = (20, 256, 32768) class ThreadLimiter(object): """Limit the number of threads writing to Keep at once. @@ -375,6 +379,7 @@ class KeepClient(object): url = self.root + str(locator) _logger.debug("Request: GET %s", url) curl = self._get_user_agent() + ok = None try: with timer.Timer() as t: self._headers = {} @@ -406,7 +411,6 @@ class KeepClient(object): self._result = { 'error': e, } - ok = False self._usable = ok != False if self._result.get('status_code', None): # The client worked well enough to get an HTTP status @@ -441,6 +445,7 @@ class KeepClient(object): url = self.root + hash_s _logger.debug("Request: PUT %s", url) curl = self._get_user_agent() + ok = None try: with timer.Timer() as t: self._headers = {} @@ -482,7 +487,6 @@ class KeepClient(object): self._result = { 'error': e, } - ok = False self._usable = ok != False # still usable if ok is True or None if self._result.get('status_code', None): # Client is functional. See comment in get(). @@ -506,11 +510,17 @@ class KeepClient(object): if not timeouts: return elif isinstance(timeouts, tuple): - conn_t, xfer_t = timeouts + if len(timeouts) == 2: + conn_t, xfer_t = timeouts + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] + else: + conn_t, xfer_t, bandwidth_bps = timeouts else: conn_t, xfer_t = (timeouts, timeouts) + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000)) - curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000)) + curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t))) + curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps))) def _headerfunction(self, header_line): header_line = header_line.decode('iso-8859-1') @@ -614,20 +624,22 @@ class KeepClient(object): :timeout: The initial timeout (in seconds) for HTTP requests to Keep - non-proxy servers. A tuple of two floats is interpreted as - (connection_timeout, read_timeout): see - http://docs.python-requests.org/en/latest/user/advanced/#timeouts. - Because timeouts are often a result of transient server load, the - actual connection timeout will be increased by a factor of two on - each retry. - Default: (2, 300). + non-proxy servers. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). A connection + will be aborted if the average traffic rate falls below + minimum_bandwidth bytes per second over an interval of read_timeout + seconds. Because timeouts are often a result of transient server + load, the actual connection timeout will be increased by a factor + of two on each retry. + Default: (2, 256, 32768). :proxy_timeout: The initial timeout (in seconds) for HTTP requests to - Keep proxies. A tuple of two floats is interpreted as - (connection_timeout, read_timeout). The behavior described - above for adjusting connection timeouts on retry also applies. - Default: (20, 300). + Keep proxies. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). The behavior + described above for adjusting connection timeouts on retry also + applies. + Default: (20, 256, 32768). :api_token: If you're not using an API client, but only talking @@ -671,6 +683,8 @@ class KeepClient(object): self.download_counter = Counter() self.put_counter = Counter() self.get_counter = Counter() + self.hits_counter = Counter() + self.misses_counter = Counter() if local_store: self.local_store = local_store @@ -718,8 +732,10 @@ class KeepClient(object): # TODO(twp): the timeout should be a property of a # KeepService, not a KeepClient. See #4488. t = self.proxy_timeout if self.using_proxy else self.timeout - return (t[0] * (1 << attempt_number), t[1]) - + if len(t) == 2: + return (t[0] * (1 << attempt_number), t[1]) + else: + return (t[0] * (1 << attempt_number), t[1], t[2]) def _any_nondisk_services(self, service_list): return any(ks.get('service_type', 'disk') != 'disk' for ks in service_list) @@ -883,9 +899,12 @@ class KeepClient(object): locator = KeepLocator(loc_s) slot, first = self.block_cache.reserve_cache(locator.md5sum) if not first: + self.hits_counter.add(1) v = slot.get() return v + self.misses_counter.add(1) + # If the locator has hints specifying a prefix (indicating a # remote keepproxy) or the UUID of a local gateway service, # read data from the indicated service(s) instead of the usual @@ -999,6 +1018,7 @@ class KeepClient(object): roots_map = {} loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) + done = 0 for tries_left in loop: try: sorted_roots = self.map_new_services( @@ -1009,7 +1029,7 @@ class KeepClient(object): continue thread_limiter = KeepClient.ThreadLimiter( - copies, self.max_replicas_per_service) + copies - done, self.max_replicas_per_service) threads = [] for service_root, ks in [(root, roots_map[root]) for root in sorted_roots]: @@ -1027,7 +1047,8 @@ class KeepClient(object): threads.append(t) for t in threads: t.join() - loop.save_result((thread_limiter.done() >= copies, len(threads))) + done += thread_limiter.done() + loop.save_result((done >= copies, len(threads))) if loop.success(): return thread_limiter.response()