5562: Add fake keepstore server with configurable problems.
authorTom Clegg <tom@curoverse.com>
Tue, 28 Apr 2015 23:08:45 +0000 (19:08 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 29 Apr 2015 04:39:29 +0000 (00:39 -0400)
sdk/python/tests/keepstub.py [new file with mode: 0644]
sdk/python/tests/test_keep_client.py

diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
new file mode 100644 (file)
index 0000000..e84230a
--- /dev/null
@@ -0,0 +1,95 @@
+import BaseHTTPServer
+import hashlib
+import os
+import re
+import SocketServer
+import time
+
+class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
+
+    allow_reuse_address = 1
+
+    def __init__(self, *args, **kwargs):
+        self.store = {}
+        self.delays = {
+            # before reading request headers
+            'request': 0,
+            # before reading request body
+            'request_body': 0,
+            # before setting response status and headers
+            'response': 0,
+            # before sending response body
+            'response_body': 0,
+            # before returning from handler (thus setting response EOF)
+            'response_close': 0,
+        }
+        super(Server, self).__init__(*args, **kwargs)
+
+    def setdelays(self, **kwargs):
+        """In future requests, induce delays at the given checkpoints."""
+        for (k, v) in kwargs.iteritems():
+            self.delays.get(k) # NameError if unknown key
+            self.delays[k] = v
+
+    def _sleep_at_least(self, seconds):
+        """Sleep for given time, even if signals are received."""
+        wake = time.time() + seconds
+        todo = seconds
+        while todo > 0:
+            time.sleep(todo)
+            todo = wake - time.time()
+
+    def _do_delay(self, k):
+        self._sleep_at_least(self.delays[k])
+
+
+class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+    def handle(self, *args, **kwargs):
+        self.server._do_delay('request')
+        return super(Handler, self).handle(*args, **kwargs)
+
+    def do_GET(self):
+        self.server._do_delay('response')
+        r = re.search(r'[0-9a-f]{32}', self.path)
+        if not r:
+            return self.send_response(422)
+        datahash = r.group(0)
+        if datahash not in self.server.store:
+            return self.send_response(404)
+        self.send_response(200)
+        self.send_header('Content-type', 'application/octet-stream')
+        self.end_headers()
+        self.server._do_delay('response_body')
+        self.wfile.write(self.server.store[datahash])
+        self.server._do_delay('response_close')
+
+    def do_PUT(self):
+        self.server._do_delay('request_body')
+        data = self.rfile.read(int(self.headers.getheader('content-length')))
+        datahash = hashlib.md5(data).hexdigest()
+        self.server.store[datahash] = data
+        self.server._do_delay('response')
+        self.send_response(200)
+        self.send_header('Content-type', 'text/plain')
+        self.end_headers()
+        self.server._do_delay('response_body')
+        self.wfile.write(datahash + '+' + str(len(data)))
+        self.server._do_delay('response_close')
+
+    def log_request(self, *args, **kwargs):
+        if os.environ.get('ARVADOS_DEBUG', None):
+            super(Handler, self).log_request(*args, **kwargs)
+
+    def finish(self, *args, **kwargs):
+        """Ignore exceptions, notably "Broken pipe" when client times out."""
+        try:
+            return super(Handler, self).finish(*args, **kwargs)
+        except:
+            pass
+
+    def handle_one_request(self, *args, **kwargs):
+        """Ignore exceptions, notably "Broken pipe" when client times out."""
+        try:
+            return super(Handler, self).handle_one_request(*args, **kwargs)
+        except:
+            pass
index 0c42c2f18cdf1221bf14c18483f9393e4309d66b..419f1ce08226339b2549da69f039e32869af27c5 100644 (file)
@@ -5,12 +5,15 @@ import pycurl
 import random
 import re
 import socket
+import threading
+import time
 import unittest
 import urlparse
 
 import arvados
 import arvados.retry
 import arvados_testutil as tutil
+import keepstub
 import run_test_server
 
 class KeepTestCase(run_test_server.TestCaseWithServers):
@@ -268,9 +271,10 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('100::1', service.hostname)
         self.assertEqual(10, service.port)
 
-    # test_get_timeout and test_put_timeout test that
-    # KeepClient.get and KeepClient.put use the appropriate timeouts
-    # when connected directly to a Keep server (i.e. non-proxy timeout)
+    # test_*_timeout verify that KeepClient instructs pycurl to use
+    # the appropriate connection and read timeouts. They don't care
+    # whether pycurl actually exhibits the expected timeout behavior
+    # -- those tests are in the KeepClientTimeout test class.
 
     def test_get_timeout(self):
         api_client = self.mock_keep_services(count=1)
@@ -460,6 +464,91 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(2, len(exc_check.exception.request_errors()))
 
 
+class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
+    DATA = 'x' * 2**10
+
+    class assertTakesBetween(unittest.TestCase):
+        def __init__(self, tmin, tmax):
+            self.tmin = tmin
+            self.tmax = tmax
+
+        def __enter__(self):
+            self.t0 = time.time()
+
+        def __exit__(self, *args, **kwargs):
+            self.assertGreater(time.time() - self.t0, self.tmin)
+            self.assertLess(time.time() - self.t0, self.tmax)
+
+    def setUp(self):
+        sock = socket.socket()
+        sock.bind(('0.0.0.0', 0))
+        self.port = sock.getsockname()[1]
+        sock.close()
+        self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
+        self.thread = threading.Thread(target=self.server.serve_forever)
+        self.thread.daemon = True # Exit thread if main proc exits
+        self.thread.start()
+        self.api_client = self.mock_keep_services(
+            count=1,
+            service_host='localhost',
+            service_port=self.port,
+        )
+
+    def tearDown(self):
+        self.server.shutdown()
+
+    def keepClient(self, timeouts=(0.1, 1.0)):
+        return arvados.KeepClient(
+            api_client=self.api_client,
+            timeout=timeouts)
+
+    def test_timeout_slow_connect(self):
+        # Can't simulate TCP delays with our own socket. Leave our
+        # stub server running uselessly, and try to connect to an
+        # unroutable IP address instead.
+        self.api_client = self.mock_keep_services(
+            count=1,
+            service_host='240.0.0.0',
+        )
+        with self.assertTakesBetween(0.1, 0.5):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+
+    def test_timeout_slow_request(self):
+        self.server.setdelays(request=0.2)
+        self._test_200ms()
+
+    def test_timeout_slow_response(self):
+        self.server.setdelays(response=0.2)
+        self._test_200ms()
+
+    def test_timeout_slow_response_body(self):
+        self.server.setdelays(response_body=0.2)
+        self._test_200ms()
+
+    def _test_200ms(self):
+        """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+
+        # Allow 100ms to connect, then 1s for response. Everything
+        # should work, and everything should take at least 200ms to
+        # return.
+        kc = self.keepClient((.1, 1))
+        with self.assertTakesBetween(.2, .3):
+            loc = kc.put(self.DATA, copies=1, num_retries=0)
+        with self.assertTakesBetween(.2, .3):
+            self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+        # Allow 1s to connect, then 100ms for response. Nothing should
+        # work, and everything should take at least 100ms to return.
+        kc = self.keepClient((1, .1))
+        with self.assertTakesBetween(.1, .2):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                kc.get(loc, num_retries=0)
+        with self.assertTakesBetween(.1, .2):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
+
+
 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
     def mock_disks_and_gateways(self, disks=3, gateways=1):
         self.gateways = [{