class KeepClient(object):
# Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 300 seconds
+ # Default Keep server read timeout: 64 seconds
+ # Default Keep server bandwidth minimum: 32768 bytes per second
# Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 300 seconds
- DEFAULT_TIMEOUT = (2, 300)
- DEFAULT_PROXY_TIMEOUT = (20, 300)
+ # Default Keep proxy read timeout: 64 seconds
+ # Default Keep proxy bandwidth minimum: 32768 bytes per second
+ DEFAULT_TIMEOUT = (2, 64, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
class ThreadLimiter(object):
"""Limit the number of threads writing to Keep at once.
if not timeouts:
return
elif isinstance(timeouts, tuple):
- conn_t, xfer_t = timeouts
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
else:
conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
header_line = header_line.decode('iso-8859-1')
:timeout:
The initial timeout (in seconds) for HTTP requests to Keep
- non-proxy servers. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout): see
- http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
- Because timeouts are often a result of transient server load, the
- actual connection timeout will be increased by a factor of two on
- each retry.
- Default: (2, 300).
+ non-proxy servers. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). A connection
+ will be aborted if the average traffic rate falls below
+ minimum_bandwidth bytes per second over an interval of read_timeout
+ seconds. Because timeouts are often a result of transient server
+ load, the actual connection timeout will be increased by a factor
+ of two on each retry.
+ Default: (2, 64, 32768).
:proxy_timeout:
The initial timeout (in seconds) for HTTP requests to
- Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). The behavior described
- above for adjusting connection timeouts on retry also applies.
- Default: (20, 300).
+ Keep proxies. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+ described above for adjusting connection timeouts on retry also
+ applies.
+ Default: (20, 64, 32768).
:api_token:
If you're not using an API client, but only talking
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
t = self.proxy_timeout if self.using_proxy else self.timeout
- return (t[0] * (1 << attempt_number), t[1])
-
+ if len(t) == 2:
+ return (t[0] * (1 << attempt_number), t[1])
+ else:
+ return (t[0] * (1 << attempt_number), t[1], t[2])
def _any_nondisk_services(self, service_list):
return any(ks.get('service_type', 'disk') != 'disk'
for ks in service_list)
'response_body': 0,
# before returning from handler (thus setting response EOF)
'response_close': 0,
+ # after writing over 1s worth of data at self.bandwidth
+ 'mid_write': 0,
+ # after reading over 1s worth of data at self.bandwidth
+ 'mid_read': 0,
}
+ self.bandwidth = None
super(Server, self).__init__(*args, **kwargs)
def setdelays(self, **kwargs):
self.delays.get(k) # NameError if unknown key
self.delays[k] = v
+ def setbandwidth(self, bandwidth):
+ """For future requests, set the maximum bandwidth (number of bytes per
+ second) to operate at. If setbandwidth is never called, function at
+ maximum bandwidth possible"""
+ self.bandwidth = float(bandwidth)
+
def _sleep_at_least(self, seconds):
"""Sleep for given time, even if signals are received."""
wake = time.time() + seconds
class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def wfile_bandwidth_write(self, data_to_write):
+ if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
+ self.wfile.write(data_to_write)
+ else:
+ BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
+ outage_happened = False
+ num_bytes = len(data_to_write)
+ num_sent_bytes = 0
+ target_time = time.time()
+ while num_sent_bytes < num_bytes:
+ if num_sent_bytes > self.server.bandwidth and not outage_happened:
+ self.server._do_delay('mid_write')
+ target_time += self.delays['mid_write']
+ outage_happened = True
+ num_write_bytes = min(BYTES_PER_WRITE,
+ num_bytes - num_sent_bytes)
+ self.wfile.write(data_to_write[
+ num_sent_bytes:num_sent_bytes+num_write_bytes])
+ num_sent_bytes += num_write_bytes
+ if self.server.bandwidth is not None:
+ target_time += num_write_bytes / self.server.bandwidth
+ self.server._sleep_at_least(target_time - time.time())
+ return None
+
+ def rfile_bandwidth_read(self, bytes_to_read):
+ if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
+ return self.rfile.read(bytes_to_read)
+ else:
+ BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
+ data = ''
+ outage_happened = False
+ bytes_read = 0
+ target_time = time.time()
+ while bytes_to_read > bytes_read:
+ if bytes_read > self.server.bandwidth and not outage_happened:
+ self.server._do_delay('mid_read')
+ target_time += self.delays['mid_read']
+ outage_happened = True
+ next_bytes_to_read = min(BYTES_PER_READ,
+ bytes_to_read - bytes_read)
+ data += self.rfile.read(next_bytes_to_read)
+ bytes_read += next_bytes_to_read
+ if self.server.bandwidth is not None:
+ target_time += next_bytes_to_read / self.server.bandwidth
+ self.server._sleep_at_least(target_time - time.time())
+ return data
+
def handle(self, *args, **kwargs):
self.server._do_delay('request')
return super(Handler, self).handle(*args, **kwargs)
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
self.server._do_delay('response_body')
- self.wfile.write(self.server.store[datahash])
+ self.wfile_bandwidth_write(self.server.store[datahash])
self.server._do_delay('response_close')
def do_PUT(self):
self.server._do_delay('request_body')
-
# The comments at https://bugs.python.org/issue1491 implies that Python
# 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
# reading the actual code that ships in Debian it clearly is not, so we
# need to send the response on the socket directly.
-
- self.wfile.write("%s %d %s\r\n\r\n" %
+ self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
(self.protocol_version, 100, "Continue"))
-
- data = self.rfile.read(int(self.headers.getheader('content-length')))
+ data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
datahash = hashlib.md5(data).hexdigest()
self.server.store[datahash] = data
self.server._do_delay('response')
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.server._do_delay('response_body')
- self.wfile.write(datahash + '+' + str(len(data)))
+ self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
self.server._do_delay('response_close')
def log_request(self, *args, **kwargs):
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
def test_put_timeout(self):
api_client = self.mock_keep_services(count=1)
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
def test_proxy_get_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
def test_proxy_put_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
def check_no_services_error(self, verb, exc_class):
api_client = mock.MagicMock(name='api_client')
class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
- DATA = 'x' * 2**10
+ # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
+ # 1s worth of data and then trigger bandwidth errors before running
+ # out of data.
+ DATA = 'x'*2**11
+ BANDWIDTH_LOW_LIM = 1024
+ TIMEOUT_TIME = 1.0
class assertTakesBetween(unittest.TestCase):
def __init__(self, tmin, tmax):
self.t0 = time.time()
def __exit__(self, *args, **kwargs):
- self.assertGreater(time.time() - self.t0, self.tmin)
- self.assertLess(time.time() - self.t0, self.tmax)
+ # Round times to milliseconds, like CURL. Otherwise, we
+ # fail when CURL reaches a 1s timeout at 0.9998s.
+ delta = round(time.time() - self.t0, 3)
+ self.assertGreaterEqual(delta, self.tmin)
+ self.assertLessEqual(delta, self.tmax)
+
+ class assertTakesGreater(unittest.TestCase):
+ def __init__(self, tmin):
+ self.tmin = tmin
+
+ def __enter__(self):
+ self.t0 = time.time()
+
+ def __exit__(self, *args, **kwargs):
+ delta = round(time.time() - self.t0, 3)
+ self.assertGreaterEqual(delta, self.tmin)
def setUp(self):
sock = socket.socket()
def tearDown(self):
self.server.shutdown()
- def keepClient(self, timeouts=(0.1, 1.0)):
+ def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
timeout=timeouts)
)
with self.assertTakesBetween(0.1, 0.5):
with self.assertRaises(arvados.errors.KeepWriteError):
- self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+ self.keepClient().put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_no_delays_success(self):
+ self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+ def test_too_low_bandwidth_no_delays_failure(self):
+ # Check that lessening bandwidth corresponds to failing
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_with_server_response_delay_failure(self):
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+ self.server.setdelays(response=self.TIMEOUT_TIME)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_with_server_mid_delay_failure(self):
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+ self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
def test_timeout_slow_request(self):
- self.server.setdelays(request=0.2)
- self._test_200ms()
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(request=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(request=2)
+ self._test_response_timeout_under_2s(loc)
def test_timeout_slow_response(self):
- self.server.setdelays(response=0.2)
- self._test_200ms()
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(response=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(response=2)
+ self._test_response_timeout_under_2s(loc)
def test_timeout_slow_response_body(self):
- self.server.setdelays(response_body=0.2)
- self._test_200ms()
-
- def _test_200ms(self):
- """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(response_body=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(response_body=2)
+ self._test_response_timeout_under_2s(loc)
+ def _test_connect_timeout_under_200ms(self, loc):
# Allow 100ms to connect, then 1s for response. Everything
# should work, and everything should take at least 200ms to
# return.
- kc = self.keepClient((.1, 1))
+ kc = self.keepClient(timeouts=(.1, 1))
with self.assertTakesBetween(.2, .3):
- loc = kc.put(self.DATA, copies=1, num_retries=0)
+ kc.put(self.DATA, copies=1, num_retries=0)
with self.assertTakesBetween(.2, .3):
self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
- # Allow 1s to connect, then 100ms for response. Nothing should
- # work, and everything should take at least 100ms to return.
- kc = self.keepClient((1, .1))
- with self.assertTakesBetween(.1, .2):
+ def _test_response_timeout_under_2s(self, loc):
+ # Allow 10s to connect, then 1s for response. Nothing should
+ # work, and everything should take at least 1s to return.
+ kc = self.keepClient(timeouts=(10, 1))
+ with self.assertTakesBetween(1, 1.9):
with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
- with self.assertTakesBetween(.1, .2):
+ with self.assertTakesBetween(1, 1.9):
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)