--- /dev/null
+import BaseHTTPServer
+import hashlib
+import os
+import re
+import SocketServer
+import time
+
+class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
+
+ allow_reuse_address = 1
+
+ def __init__(self, *args, **kwargs):
+ self.store = {}
+ self.delays = {
+ # before reading request headers
+ 'request': 0,
+ # before reading request body
+ 'request_body': 0,
+ # before setting response status and headers
+ 'response': 0,
+ # before sending response body
+ 'response_body': 0,
+ # before returning from handler (thus setting response EOF)
+ 'response_close': 0,
+ }
+ super(Server, self).__init__(*args, **kwargs)
+
+ def setdelays(self, **kwargs):
+ """In future requests, induce delays at the given checkpoints."""
+ for (k, v) in kwargs.iteritems():
+ self.delays.get(k) # NameError if unknown key
+ self.delays[k] = v
+
+ def _sleep_at_least(self, seconds):
+ """Sleep for given time, even if signals are received."""
+ wake = time.time() + seconds
+ todo = seconds
+ while todo > 0:
+ time.sleep(todo)
+ todo = wake - time.time()
+
+ def _do_delay(self, k):
+ self._sleep_at_least(self.delays[k])
+
+
+class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def handle(self, *args, **kwargs):
+ self.server._do_delay('request')
+ return super(Handler, self).handle(*args, **kwargs)
+
+ def do_GET(self):
+ self.server._do_delay('response')
+ r = re.search(r'[0-9a-f]{32}', self.path)
+ if not r:
+ return self.send_response(422)
+ datahash = r.group(0)
+ if datahash not in self.server.store:
+ return self.send_response(404)
+ self.send_response(200)
+ self.send_header('Content-type', 'application/octet-stream')
+ self.end_headers()
+ self.server._do_delay('response_body')
+ self.wfile.write(self.server.store[datahash])
+ self.server._do_delay('response_close')
+
+ def do_PUT(self):
+ self.server._do_delay('request_body')
+ data = self.rfile.read(int(self.headers.getheader('content-length')))
+ datahash = hashlib.md5(data).hexdigest()
+ self.server.store[datahash] = data
+ self.server._do_delay('response')
+ self.send_response(200)
+ self.send_header('Content-type', 'text/plain')
+ self.end_headers()
+ self.server._do_delay('response_body')
+ self.wfile.write(datahash + '+' + str(len(data)))
+ self.server._do_delay('response_close')
+
+ def log_request(self, *args, **kwargs):
+ if os.environ.get('ARVADOS_DEBUG', None):
+ super(Handler, self).log_request(*args, **kwargs)
+
+ def finish(self, *args, **kwargs):
+ """Ignore exceptions, notably "Broken pipe" when client times out."""
+ try:
+ return super(Handler, self).finish(*args, **kwargs)
+ except:
+ pass
+
+ def handle_one_request(self, *args, **kwargs):
+ """Ignore exceptions, notably "Broken pipe" when client times out."""
+ try:
+ return super(Handler, self).handle_one_request(*args, **kwargs)
+ except:
+ pass
import random
import re
import socket
+import threading
+import time
import unittest
import urlparse
import arvados
import arvados.retry
import arvados_testutil as tutil
+import keepstub
import run_test_server
class KeepTestCase(run_test_server.TestCaseWithServers):
self.assertEqual('100::1', service.hostname)
self.assertEqual(10, service.port)
- # test_get_timeout and test_put_timeout test that
- # KeepClient.get and KeepClient.put use the appropriate timeouts
- # when connected directly to a Keep server (i.e. non-proxy timeout)
+ # test_*_timeout verify that KeepClient instructs pycurl to use
+ # the appropriate connection and read timeouts. They don't care
+ # whether pycurl actually exhibits the expected timeout behavior
+ # -- those tests are in the KeepClientTimeout test class.
def test_get_timeout(self):
api_client = self.mock_keep_services(count=1)
self.assertEqual(2, len(exc_check.exception.request_errors()))
+class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
+ DATA = 'x' * 2**10
+
+ class assertTakesBetween(unittest.TestCase):
+ def __init__(self, tmin, tmax):
+ self.tmin = tmin
+ self.tmax = tmax
+
+ def __enter__(self):
+ self.t0 = time.time()
+
+ def __exit__(self, *args, **kwargs):
+ self.assertGreater(time.time() - self.t0, self.tmin)
+ self.assertLess(time.time() - self.t0, self.tmax)
+
+ def setUp(self):
+ sock = socket.socket()
+ sock.bind(('0.0.0.0', 0))
+ self.port = sock.getsockname()[1]
+ sock.close()
+ self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.daemon = True # Exit thread if main proc exits
+ self.thread.start()
+ self.api_client = self.mock_keep_services(
+ count=1,
+ service_host='localhost',
+ service_port=self.port,
+ )
+
+ def tearDown(self):
+ self.server.shutdown()
+
+ def keepClient(self, timeouts=(0.1, 1.0)):
+ return arvados.KeepClient(
+ api_client=self.api_client,
+ timeout=timeouts)
+
+ def test_timeout_slow_connect(self):
+ # Can't simulate TCP delays with our own socket. Leave our
+ # stub server running uselessly, and try to connect to an
+ # unroutable IP address instead.
+ self.api_client = self.mock_keep_services(
+ count=1,
+ service_host='240.0.0.0',
+ )
+ with self.assertTakesBetween(0.1, 0.5):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+
+ def test_timeout_slow_request(self):
+ self.server.setdelays(request=0.2)
+ self._test_200ms()
+
+ def test_timeout_slow_response(self):
+ self.server.setdelays(response=0.2)
+ self._test_200ms()
+
+ def test_timeout_slow_response_body(self):
+ self.server.setdelays(response_body=0.2)
+ self._test_200ms()
+
+ def _test_200ms(self):
+ """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+
+ # Allow 100ms to connect, then 1s for response. Everything
+ # should work, and everything should take at least 200ms to
+ # return.
+ kc = self.keepClient((.1, 1))
+ with self.assertTakesBetween(.2, .3):
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ with self.assertTakesBetween(.2, .3):
+ self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+ # Allow 1s to connect, then 100ms for response. Nothing should
+ # work, and everything should take at least 100ms to return.
+ kc = self.keepClient((1, .1))
+ with self.assertTakesBetween(.1, .2):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ kc.get(loc, num_retries=0)
+ with self.assertTakesBetween(.1, .2):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+
class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
def mock_disks_and_gateways(self, disks=3, gateways=1):
self.gateways = [{