From: Brett Smith Date: Mon, 9 May 2016 16:54:23 +0000 (-0400) Subject: 9135: Bring EventClient's public interface closer to PollClient's. X-Git-Tag: 1.1.0~941^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/839a409f51114316c0683fbd00d8174fc24b4a9f 9135: Bring EventClient's public interface closer to PollClient's. * Restore the run_forever method, which was previously inherited from WebSocketClient. * Remove the connect and close_connection methods, which are WebSocketClient implementation details that don't make sense as part of the public interface. (A running EventClient will just reconnect if you call close_connection on it.) --- diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py index 68cfa21870..81a9b36182 100644 --- a/sdk/python/arvados/events.py +++ b/sdk/python/arvados/events.py @@ -83,7 +83,7 @@ class EventClient(object): self.filters = [[]] self.on_event_cb = on_event_cb self.last_log_id = last_log_id - self.is_closed = False + self.is_closed = threading.Event() self._setup_event_client() def _setup_event_client(self): @@ -96,12 +96,6 @@ class EventClient(object): self.ec.close_connection() raise - def connect(self): - self.ec.connect() - - def close_connection(self): - self.ec.close_connection() - def subscribe(self, f, last_log_id=None): self.filters.append(f) self.ec.subscribe(f, last_log_id) @@ -111,7 +105,7 @@ class EventClient(object): self.ec.unsubscribe(f) def close(self, code=1000, reason='', timeout=0): - self.is_closed = True + self.is_closed.set() self.ec.close(code, reason, timeout) def on_event(self, m): @@ -124,7 +118,7 @@ class EventClient(object): thread.interrupt_main() def on_closed(self): - if self.is_closed == False: + if not self.is_closed.is_set(): _logger.warn("Unexpected close. Reconnecting.") for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): try: @@ -134,10 +128,15 @@ class EventClient(object): _logger.warn("Error '%s' during websocket reconnect.", e) if tries_left == 0: _logger.exception("EventClient thread could not contact websocket server.") - self.is_closed = True + self.is_closed.set() thread.interrupt_main() return + def run_forever(self): + # Have to poll here to let KeyboardInterrupt get raised. + while not self.is_closed.wait(1): + pass + class PollClient(threading.Thread): def __init__(self, api, filters, on_event, poll_time, last_log_id): diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py index 7b69fa2cfb..f2cdba28c7 100644 --- a/sdk/python/tests/test_events.py +++ b/sdk/python/tests/test_events.py @@ -176,7 +176,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers): # close (im)properly if close_unexpected: - self.ws.close_connection() + self.ws.ec.close_connection() else: self.ws.close() @@ -260,13 +260,28 @@ class WebsocketTest(run_test_server.TestCaseWithServers): client.unsubscribe(filters[:]) websocket_client().unsubscribe.assert_called_with(filters) - @unittest.expectedFailure @mock.patch('arvados.events._EventClient') - def test_run_forever(self, websocket_client): + def test_run_forever_survives_reconnects(self, websocket_client): + connection_cond = threading.Condition() + def ws_connect(): + with connection_cond: + connection_cond.notify_all() + websocket_client().connect.side_effect = ws_connect client = arvados.events.EventClient( self.MOCK_WS_URL, [], lambda event: None, None) - client.run_forever() - websocket_client().run_forever.assert_called_with() + with connection_cond: + forever_thread = threading.Thread(target=client.run_forever) + forever_thread.start() + # Simulate an unexpected disconnect, and wait for reconnect. + close_thread = threading.Thread(target=client.on_closed) + close_thread.start() + connection_cond.wait() + close_thread.join() + run_forever_alive = forever_thread.is_alive() + client.close() + forever_thread.join() + self.assertTrue(run_forever_alive) + self.assertEqual(2, websocket_client().connect.call_count) class PollClientTestCase(unittest.TestCase):