From 441cb401a907ebca19042f73942a76d247bab847 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Thu, 5 May 2016 17:25:39 -0400 Subject: [PATCH] 9135: Make EventClient initialization more consistent. * DRY up the setup code. * Make the client a daemon thread, for consistency with PollClient. * Always try to close the connection on failure. We do this in _subscribe_websocket, so why not do it on reconnect as well? * Remove the public connect() and close_connection() methods from EventClient. They're no longer needed, and avoid leaking implementation details. --- sdk/python/arvados/events.py | 33 ++++++++++++++++----------------- sdk/python/tests/test_events.py | 25 +++++++++++++++---------- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py index 8f7ae699cd..96d784046e 100644 --- a/sdk/python/arvados/events.py +++ b/sdk/python/arvados/events.py @@ -84,7 +84,17 @@ class EventClient(object): self.on_event_cb = on_event_cb self.last_log_id = last_log_id self.is_closed = False - self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed) + self._setup_event_client() + + def _setup_event_client(self): + self.ec = _EventClient(self.url, self.filters, self.on_event, + self.last_log_id, self.on_closed) + self.ec.daemon = True + try: + self.ec.connect() + except Exception: + self.ec.close_connection() + raise def subscribe(self, f, last_log_id=None): self.filters.append(f) @@ -112,8 +122,7 @@ class EventClient(object): _logger.warn("Unexpected close. Reconnecting.") for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): try: - self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed) - self.ec.connect() + self._setup_event_client() break except Exception as e: _logger.warn("Error '%s' during websocket reconnect.", e) @@ -123,10 +132,8 @@ class EventClient(object): thread.interrupt_main() return - def _delegate_to_ec(attr_name): - return property(lambda self: getattr(self.ec, attr_name)) - for _method_name in ['connect', 'close_connection', 'run_forever']: - locals()[_method_name] = _delegate_to_ec(_method_name) + def run_forever(self): + return self.ec.run_forever() class PollClient(threading.Thread): @@ -249,18 +256,10 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None): if not endpoint: raise errors.FeatureNotEnabledError( "Server does not advertise a websocket endpoint") + uri_with_token = "{}?api_token={}".format(endpoint, api.api_token) try: - uri_with_token = "{}?api_token={}".format(endpoint, api.api_token) client = EventClient(uri_with_token, filters, on_event, last_log_id) - ok = False - try: - client.connect() - ok = True - return client - finally: - if not ok: - client.close_connection() - except: + except Exception: _logger.warn("Failed to connect to websockets on %s" % endpoint) raise diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py index 81beba59b7..23bd024adc 100644 --- a/sdk/python/tests/test_events.py +++ b/sdk/python/tests/test_events.py @@ -26,6 +26,14 @@ class EventClientTestCase(unittest.TestCase): yield arvados.events.EventClient(TEST_WS_URL, filters, callback, last_log_id), ws_mock + def test_client_init(self): + with self.mocked_client() as client_tuple: + client, ws_mock = client_tuple + ws_mock.assert_called_with(TEST_WS_URL, [[]], client.on_event, None, + client.on_closed) + ws_mock().connect.assert_called() + self.assertIs(ws_mock().daemon, True) + def test_subscribe_calls_ws(self): ws_filter = ['kind', '=', 'arvados#test'] with self.mocked_client() as client_tuple: @@ -41,19 +49,10 @@ class EventClientTestCase(unittest.TestCase): client.unsubscribe(ws_filter) ws_mock().unsubscribe.assert_called_with(ws_filter) - # PollClient doesn't have this method, but for now you have to call it - # for anything to work. - def test_connect_calls_ws(self): - with self.mocked_client() as client_tuple: - client, ws_mock = client_tuple - client.connect() - ws_mock().connect.assert_called() - def test_close_calls_ws(self): with self.mocked_client() as client_tuple: client, ws_mock = client_tuple ws_mock().close.side_effect = lambda *args: client.on_closed() - client.connect() client.close() ws_mock().close.assert_called() # Check on_closed did not try to reconnect. @@ -62,6 +61,12 @@ class EventClientTestCase(unittest.TestCase): def test_run_forever_calls_ws(self): with self.mocked_client() as client_tuple: client, ws_mock = client_tuple - client.connect() client.run_forever() ws_mock().run_forever.assert_called() + + def test_reconnect_rebuilds_and_reconnects_underlying_client(self): + with self.mocked_client() as client_tuple: + client, ws_mock = client_tuple + client.on_closed() + self.assertEqual(2, ws_mock.call_count) + self.assertEqual(2, ws_mock().connect.call_count) -- 2.30.2