From 20b988d5b901f459f95043c3702f1f9b104f3fbf Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 28 Apr 2015 19:08:45 -0400 Subject: [PATCH] 5562: Add fake keepstore server with configurable problems. --- sdk/python/tests/keepstub.py | 95 ++++++++++++++++++++++++++++ sdk/python/tests/test_keep_client.py | 95 +++++++++++++++++++++++++++- 2 files changed, 187 insertions(+), 3 deletions(-) create mode 100644 sdk/python/tests/keepstub.py diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py new file mode 100644 index 0000000000..e84230aaee --- /dev/null +++ b/sdk/python/tests/keepstub.py @@ -0,0 +1,95 @@ +import BaseHTTPServer +import hashlib +import os +import re +import SocketServer +import time + +class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object): + + allow_reuse_address = 1 + + def __init__(self, *args, **kwargs): + self.store = {} + self.delays = { + # before reading request headers + 'request': 0, + # before reading request body + 'request_body': 0, + # before setting response status and headers + 'response': 0, + # before sending response body + 'response_body': 0, + # before returning from handler (thus setting response EOF) + 'response_close': 0, + } + super(Server, self).__init__(*args, **kwargs) + + def setdelays(self, **kwargs): + """In future requests, induce delays at the given checkpoints.""" + for (k, v) in kwargs.iteritems(): + self.delays.get(k) # NameError if unknown key + self.delays[k] = v + + def _sleep_at_least(self, seconds): + """Sleep for given time, even if signals are received.""" + wake = time.time() + seconds + todo = seconds + while todo > 0: + time.sleep(todo) + todo = wake - time.time() + + def _do_delay(self, k): + self._sleep_at_least(self.delays[k]) + + +class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object): + def handle(self, *args, **kwargs): + self.server._do_delay('request') + return super(Handler, self).handle(*args, **kwargs) + + def do_GET(self): + self.server._do_delay('response') + r = re.search(r'[0-9a-f]{32}', self.path) + if not r: + return self.send_response(422) + datahash = r.group(0) + if datahash not in self.server.store: + return self.send_response(404) + self.send_response(200) + self.send_header('Content-type', 'application/octet-stream') + self.end_headers() + self.server._do_delay('response_body') + self.wfile.write(self.server.store[datahash]) + self.server._do_delay('response_close') + + def do_PUT(self): + self.server._do_delay('request_body') + data = self.rfile.read(int(self.headers.getheader('content-length'))) + datahash = hashlib.md5(data).hexdigest() + self.server.store[datahash] = data + self.server._do_delay('response') + self.send_response(200) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.server._do_delay('response_body') + self.wfile.write(datahash + '+' + str(len(data))) + self.server._do_delay('response_close') + + def log_request(self, *args, **kwargs): + if os.environ.get('ARVADOS_DEBUG', None): + super(Handler, self).log_request(*args, **kwargs) + + def finish(self, *args, **kwargs): + """Ignore exceptions, notably "Broken pipe" when client times out.""" + try: + return super(Handler, self).finish(*args, **kwargs) + except: + pass + + def handle_one_request(self, *args, **kwargs): + """Ignore exceptions, notably "Broken pipe" when client times out.""" + try: + return super(Handler, self).handle_one_request(*args, **kwargs) + except: + pass diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py index 0c42c2f18c..419f1ce082 100644 --- a/sdk/python/tests/test_keep_client.py +++ b/sdk/python/tests/test_keep_client.py @@ -5,12 +5,15 @@ import pycurl import random import re import socket +import threading +import time import unittest import urlparse import arvados import arvados.retry import arvados_testutil as tutil +import keepstub import run_test_server class KeepTestCase(run_test_server.TestCaseWithServers): @@ -268,9 +271,10 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): self.assertEqual('100::1', service.hostname) self.assertEqual(10, service.port) - # test_get_timeout and test_put_timeout test that - # KeepClient.get and KeepClient.put use the appropriate timeouts - # when connected directly to a Keep server (i.e. non-proxy timeout) + # test_*_timeout verify that KeepClient instructs pycurl to use + # the appropriate connection and read timeouts. They don't care + # whether pycurl actually exhibits the expected timeout behavior + # -- those tests are in the KeepClientTimeout test class. def test_get_timeout(self): api_client = self.mock_keep_services(count=1) @@ -460,6 +464,91 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): self.assertEqual(2, len(exc_check.exception.request_errors())) +class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock): + DATA = 'x' * 2**10 + + class assertTakesBetween(unittest.TestCase): + def __init__(self, tmin, tmax): + self.tmin = tmin + self.tmax = tmax + + def __enter__(self): + self.t0 = time.time() + + def __exit__(self, *args, **kwargs): + self.assertGreater(time.time() - self.t0, self.tmin) + self.assertLess(time.time() - self.t0, self.tmax) + + def setUp(self): + sock = socket.socket() + sock.bind(('0.0.0.0', 0)) + self.port = sock.getsockname()[1] + sock.close() + self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.daemon = True # Exit thread if main proc exits + self.thread.start() + self.api_client = self.mock_keep_services( + count=1, + service_host='localhost', + service_port=self.port, + ) + + def tearDown(self): + self.server.shutdown() + + def keepClient(self, timeouts=(0.1, 1.0)): + return arvados.KeepClient( + api_client=self.api_client, + timeout=timeouts) + + def test_timeout_slow_connect(self): + # Can't simulate TCP delays with our own socket. Leave our + # stub server running uselessly, and try to connect to an + # unroutable IP address instead. + self.api_client = self.mock_keep_services( + count=1, + service_host='240.0.0.0', + ) + with self.assertTakesBetween(0.1, 0.5): + with self.assertRaises(arvados.errors.KeepWriteError): + self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0) + + def test_timeout_slow_request(self): + self.server.setdelays(request=0.2) + self._test_200ms() + + def test_timeout_slow_response(self): + self.server.setdelays(response=0.2) + self._test_200ms() + + def test_timeout_slow_response_body(self): + self.server.setdelays(response_body=0.2) + self._test_200ms() + + def _test_200ms(self): + """Connect should be t<100ms, request should be 200ms <= t < 300ms""" + + # Allow 100ms to connect, then 1s for response. Everything + # should work, and everything should take at least 200ms to + # return. + kc = self.keepClient((.1, 1)) + with self.assertTakesBetween(.2, .3): + loc = kc.put(self.DATA, copies=1, num_retries=0) + with self.assertTakesBetween(.2, .3): + self.assertEqual(self.DATA, kc.get(loc, num_retries=0)) + + # Allow 1s to connect, then 100ms for response. Nothing should + # work, and everything should take at least 100ms to return. + kc = self.keepClient((1, .1)) + with self.assertTakesBetween(.1, .2): + with self.assertRaises(arvados.errors.KeepReadError): + kc.get(loc, num_retries=0) + with self.assertTakesBetween(.1, .2): + with self.assertRaises(arvados.errors.KeepWriteError): + kc.put(self.DATA, copies=1, num_retries=0) + + class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock): def mock_disks_and_gateways(self, disks=3, gateways=1): self.gateways = [{ -- 2.30.2