From e9c78ef7855e7ae263fe461e069c89ff7fc0b798 Mon Sep 17 00:00:00 2001 From: sguthrie Date: Tue, 10 Nov 2015 15:23:18 -0500 Subject: [PATCH 1/1] Closes #7235. Instead of setting KeepService's pycurl.TIMEOUT_MS, set pycurl.LOW_SPEED_LIMIT and pycurl.LOW_SPEED_TIME. Default LOW_SPEED_LIMIT is 32768 bytes per second. Default LOW_SPEED_TIME is 64 seconds. If the user specifies a length-two tuple, the first item sets CONNECTTIMEOUT_MS, the second item sets LOW_SPEED_TIME, and LOW_SPEED_LIMIT is set to 32768 bytes per second. Added bandwidth similator to keepstub, which uses millisecond precision (like curl) to measure timeouts. Added tests to test_keep_client and modified existing tests to only use integers. --- sdk/python/arvados/keep.py | 50 ++++++---- sdk/python/tests/keepstub.py | 69 +++++++++++-- sdk/python/tests/test_keep_client.py | 139 +++++++++++++++++++++------ 3 files changed, 203 insertions(+), 55 deletions(-) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index ec9f6f6422..e01fec412b 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -213,11 +213,13 @@ class KeepBlockCache(object): class KeepClient(object): # Default Keep server connection timeout: 2 seconds - # Default Keep server read timeout: 300 seconds + # Default Keep server read timeout: 64 seconds + # Default Keep server bandwidth minimum: 32768 bytes per second # Default Keep proxy connection timeout: 20 seconds - # Default Keep proxy read timeout: 300 seconds - DEFAULT_TIMEOUT = (2, 300) - DEFAULT_PROXY_TIMEOUT = (20, 300) + # Default Keep proxy read timeout: 64 seconds + # Default Keep proxy bandwidth minimum: 32768 bytes per second + DEFAULT_TIMEOUT = (2, 64, 32768) + DEFAULT_PROXY_TIMEOUT = (20, 64, 32768) class ThreadLimiter(object): """Limit the number of threads writing to Keep at once. @@ -478,11 +480,17 @@ class KeepClient(object): if not timeouts: return elif isinstance(timeouts, tuple): - conn_t, xfer_t = timeouts + if len(timeouts) == 2: + conn_t, xfer_t = timeouts + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] + else: + conn_t, xfer_t, bandwidth_bps = timeouts else: conn_t, xfer_t = (timeouts, timeouts) + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000)) - curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000)) + curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t))) + curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps))) def _headerfunction(self, header_line): header_line = header_line.decode('iso-8859-1') @@ -586,20 +594,22 @@ class KeepClient(object): :timeout: The initial timeout (in seconds) for HTTP requests to Keep - non-proxy servers. A tuple of two floats is interpreted as - (connection_timeout, read_timeout): see - http://docs.python-requests.org/en/latest/user/advanced/#timeouts. - Because timeouts are often a result of transient server load, the - actual connection timeout will be increased by a factor of two on - each retry. - Default: (2, 300). + non-proxy servers. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). A connection + will be aborted if the average traffic rate falls below + minimum_bandwidth bytes per second over an interval of read_timeout + seconds. Because timeouts are often a result of transient server + load, the actual connection timeout will be increased by a factor + of two on each retry. + Default: (2, 64, 32768). :proxy_timeout: The initial timeout (in seconds) for HTTP requests to - Keep proxies. A tuple of two floats is interpreted as - (connection_timeout, read_timeout). The behavior described - above for adjusting connection timeouts on retry also applies. - Default: (20, 300). + Keep proxies. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). The behavior + described above for adjusting connection timeouts on retry also + applies. + Default: (20, 64, 32768). :api_token: If you're not using an API client, but only talking @@ -686,8 +696,10 @@ class KeepClient(object): # TODO(twp): the timeout should be a property of a # KeepService, not a KeepClient. See #4488. t = self.proxy_timeout if self.using_proxy else self.timeout - return (t[0] * (1 << attempt_number), t[1]) - + if len(t) == 2: + return (t[0] * (1 << attempt_number), t[1]) + else: + return (t[0] * (1 << attempt_number), t[1], t[2]) def _any_nondisk_services(self, service_list): return any(ks.get('service_type', 'disk') != 'disk' for ks in service_list) diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py index ef724ed5c5..f074f8d6cf 100644 --- a/sdk/python/tests/keepstub.py +++ b/sdk/python/tests/keepstub.py @@ -22,7 +22,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object): 'response_body': 0, # before returning from handler (thus setting response EOF) 'response_close': 0, + # after writing over 1s worth of data at self.bandwidth + 'mid_write': 0, + # after reading over 1s worth of data at self.bandwidth + 'mid_read': 0, } + self.bandwidth = None super(Server, self).__init__(*args, **kwargs) def setdelays(self, **kwargs): @@ -31,6 +36,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object): self.delays.get(k) # NameError if unknown key self.delays[k] = v + def setbandwidth(self, bandwidth): + """For future requests, set the maximum bandwidth (number of bytes per + second) to operate at. If setbandwidth is never called, function at + maximum bandwidth possible""" + self.bandwidth = float(bandwidth) + def _sleep_at_least(self, seconds): """Sleep for given time, even if signals are received.""" wake = time.time() + seconds @@ -44,6 +55,53 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object): class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object): + def wfile_bandwidth_write(self, data_to_write): + if self.server.bandwidth == None and self.server.delays['mid_write'] == 0: + self.wfile.write(data_to_write) + else: + BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768 + outage_happened = False + num_bytes = len(data_to_write) + num_sent_bytes = 0 + target_time = time.time() + while num_sent_bytes < num_bytes: + if num_sent_bytes > self.server.bandwidth and not outage_happened: + self.server._do_delay('mid_write') + target_time += self.delays['mid_write'] + outage_happened = True + num_write_bytes = min(BYTES_PER_WRITE, + num_bytes - num_sent_bytes) + self.wfile.write(data_to_write[ + num_sent_bytes:num_sent_bytes+num_write_bytes]) + num_sent_bytes += num_write_bytes + if self.server.bandwidth is not None: + target_time += num_write_bytes / self.server.bandwidth + self.server._sleep_at_least(target_time - time.time()) + return None + + def rfile_bandwidth_read(self, bytes_to_read): + if self.server.bandwidth == None and self.server.delays['mid_read'] == 0: + return self.rfile.read(bytes_to_read) + else: + BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768 + data = '' + outage_happened = False + bytes_read = 0 + target_time = time.time() + while bytes_to_read > bytes_read: + if bytes_read > self.server.bandwidth and not outage_happened: + self.server._do_delay('mid_read') + target_time += self.delays['mid_read'] + outage_happened = True + next_bytes_to_read = min(BYTES_PER_READ, + bytes_to_read - bytes_read) + data += self.rfile.read(next_bytes_to_read) + bytes_read += next_bytes_to_read + if self.server.bandwidth is not None: + target_time += next_bytes_to_read / self.server.bandwidth + self.server._sleep_at_least(target_time - time.time()) + return data + def handle(self, *args, **kwargs): self.server._do_delay('request') return super(Handler, self).handle(*args, **kwargs) @@ -60,21 +118,18 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object): self.send_header('Content-type', 'application/octet-stream') self.end_headers() self.server._do_delay('response_body') - self.wfile.write(self.server.store[datahash]) + self.wfile_bandwidth_write(self.server.store[datahash]) self.server._do_delay('response_close') def do_PUT(self): self.server._do_delay('request_body') - # The comments at https://bugs.python.org/issue1491 implies that Python # 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but # reading the actual code that ships in Debian it clearly is not, so we # need to send the response on the socket directly. - - self.wfile.write("%s %d %s\r\n\r\n" % + self.wfile_bandwidth_write("%s %d %s\r\n\r\n" % (self.protocol_version, 100, "Continue")) - - data = self.rfile.read(int(self.headers.getheader('content-length'))) + data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length'))) datahash = hashlib.md5(data).hexdigest() self.server.store[datahash] = data self.server._do_delay('response') @@ -82,7 +137,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object): self.send_header('Content-type', 'text/plain') self.end_headers() self.server._do_delay('response_body') - self.wfile.write(datahash + '+' + str(len(data))) + self.wfile_bandwidth_write(datahash + '+' + str(len(data))) self.server._do_delay('response_close') def log_request(self, *args, **kwargs): diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index ada0dac80e..52eb0e9915 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -287,8 +287,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( - mock.responses[0].getopt(pycurl.TIMEOUT_MS), - int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000)) + mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), + int(arvados.KeepClient.DEFAULT_TIMEOUT[1])) + self.assertEqual( + mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), + int(arvados.KeepClient.DEFAULT_TIMEOUT[2])) def test_put_timeout(self): api_client = self.mock_keep_services(count=1) @@ -301,8 +304,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( - mock.responses[0].getopt(pycurl.TIMEOUT_MS), - int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000)) + mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), + int(arvados.KeepClient.DEFAULT_TIMEOUT[1])) + self.assertEqual( + mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), + int(arvados.KeepClient.DEFAULT_TIMEOUT[2])) def test_proxy_get_timeout(self): api_client = self.mock_keep_services(service_type='proxy', count=1) @@ -315,8 +321,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( - mock.responses[0].getopt(pycurl.TIMEOUT_MS), - int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000)) + mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), + int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1])) + self.assertEqual( + mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), + int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2])) def test_proxy_put_timeout(self): api_client = self.mock_keep_services(service_type='proxy', count=1) @@ -329,8 +338,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( - mock.responses[0].getopt(pycurl.TIMEOUT_MS), - int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000)) + mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), + int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1])) + self.assertEqual( + mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), + int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2])) def check_no_services_error(self, verb, exc_class): api_client = mock.MagicMock(name='api_client') @@ -570,7 +582,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock): class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock): - DATA = 'x' * 2**10 + # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer + # 1s worth of data and then trigger bandwidth errors before running + # out of data. + DATA = 'x'*2**11 + BANDWIDTH_LOW_LIM = 1024 + TIMEOUT_TIME = 1.0 class assertTakesBetween(unittest.TestCase): def __init__(self, tmin, tmax): @@ -581,8 +598,22 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock): self.t0 = time.time() def __exit__(self, *args, **kwargs): - self.assertGreater(time.time() - self.t0, self.tmin) - self.assertLess(time.time() - self.t0, self.tmax) + # Round times to milliseconds, like CURL. Otherwise, we + # fail when CURL reaches a 1s timeout at 0.9998s. + delta = round(time.time() - self.t0, 3) + self.assertGreaterEqual(delta, self.tmin) + self.assertLessEqual(delta, self.tmax) + + class assertTakesGreater(unittest.TestCase): + def __init__(self, tmin): + self.tmin = tmin + + def __enter__(self): + self.t0 = time.time() + + def __exit__(self, *args, **kwargs): + delta = round(time.time() - self.t0, 3) + self.assertGreaterEqual(delta, self.tmin) def setUp(self): sock = socket.socket() @@ -602,7 +633,7 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock): def tearDown(self): self.server.shutdown() - def keepClient(self, timeouts=(0.1, 1.0)): + def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)): return arvados.KeepClient( api_client=self.api_client, timeout=timeouts) @@ -617,39 +648,89 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock): ) with self.assertTakesBetween(0.1, 0.5): with self.assertRaises(arvados.errors.KeepWriteError): - self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0) + self.keepClient().put(self.DATA, copies=1, num_retries=0) + + def test_low_bandwidth_no_delays_success(self): + self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM) + kc = self.keepClient() + loc = kc.put(self.DATA, copies=1, num_retries=0) + self.assertEqual(self.DATA, kc.get(loc, num_retries=0)) + + def test_too_low_bandwidth_no_delays_failure(self): + # Check that lessening bandwidth corresponds to failing + kc = self.keepClient() + loc = kc.put(self.DATA, copies=1, num_retries=0) + self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM) + with self.assertTakesGreater(self.TIMEOUT_TIME): + with self.assertRaises(arvados.errors.KeepReadError) as e: + kc.get(loc, num_retries=0) + with self.assertTakesGreater(self.TIMEOUT_TIME): + with self.assertRaises(arvados.errors.KeepWriteError): + kc.put(self.DATA, copies=1, num_retries=0) + + def test_low_bandwidth_with_server_response_delay_failure(self): + kc = self.keepClient() + loc = kc.put(self.DATA, copies=1, num_retries=0) + self.server.setbandwidth(self.BANDWIDTH_LOW_LIM) + self.server.setdelays(response=self.TIMEOUT_TIME) + with self.assertTakesGreater(self.TIMEOUT_TIME): + with self.assertRaises(arvados.errors.KeepReadError) as e: + kc.get(loc, num_retries=0) + with self.assertTakesGreater(self.TIMEOUT_TIME): + with self.assertRaises(arvados.errors.KeepWriteError): + kc.put(self.DATA, copies=1, num_retries=0) + + def test_low_bandwidth_with_server_mid_delay_failure(self): + kc = self.keepClient() + loc = kc.put(self.DATA, copies=1, num_retries=0) + self.server.setbandwidth(self.BANDWIDTH_LOW_LIM) + self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME) + with self.assertTakesGreater(self.TIMEOUT_TIME): + with self.assertRaises(arvados.errors.KeepReadError) as e: + kc.get(loc, num_retries=0) + with self.assertTakesGreater(self.TIMEOUT_TIME): + with self.assertRaises(arvados.errors.KeepWriteError): + kc.put(self.DATA, copies=1, num_retries=0) def test_timeout_slow_request(self): - self.server.setdelays(request=0.2) - self._test_200ms() + loc = self.keepClient().put(self.DATA, copies=1, num_retries=0) + self.server.setdelays(request=.2) + self._test_connect_timeout_under_200ms(loc) + self.server.setdelays(request=2) + self._test_response_timeout_under_2s(loc) def test_timeout_slow_response(self): - self.server.setdelays(response=0.2) - self._test_200ms() + loc = self.keepClient().put(self.DATA, copies=1, num_retries=0) + self.server.setdelays(response=.2) + self._test_connect_timeout_under_200ms(loc) + self.server.setdelays(response=2) + self._test_response_timeout_under_2s(loc) def test_timeout_slow_response_body(self): - self.server.setdelays(response_body=0.2) - self._test_200ms() - - def _test_200ms(self): - """Connect should be t<100ms, request should be 200ms <= t < 300ms""" + loc = self.keepClient().put(self.DATA, copies=1, num_retries=0) + self.server.setdelays(response_body=.2) + self._test_connect_timeout_under_200ms(loc) + self.server.setdelays(response_body=2) + self._test_response_timeout_under_2s(loc) + def _test_connect_timeout_under_200ms(self, loc): # Allow 100ms to connect, then 1s for response. Everything # should work, and everything should take at least 200ms to # return. - kc = self.keepClient((.1, 1)) + kc = self.keepClient(timeouts=(.1, 1)) with self.assertTakesBetween(.2, .3): - loc = kc.put(self.DATA, copies=1, num_retries=0) + kc.put(self.DATA, copies=1, num_retries=0) with self.assertTakesBetween(.2, .3): self.assertEqual(self.DATA, kc.get(loc, num_retries=0)) - # Allow 1s to connect, then 100ms for response. Nothing should - # work, and everything should take at least 100ms to return. - kc = self.keepClient((1, .1)) - with self.assertTakesBetween(.1, .2): + def _test_response_timeout_under_2s(self, loc): + # Allow 10s to connect, then 1s for response. Nothing should + # work, and everything should take at least 1s to return. + kc = self.keepClient(timeouts=(10, 1)) + with self.assertTakesBetween(1, 1.9): with self.assertRaises(arvados.errors.KeepReadError): kc.get(loc, num_retries=0) - with self.assertTakesBetween(.1, .2): + with self.assertTakesBetween(1, 1.9): with self.assertRaises(arvados.errors.KeepWriteError): kc.put(self.DATA, copies=1, num_retries=0) -- 2.30.2