self.filters = [[]]
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
- self.is_closed = False
+ self.is_closed = threading.Event()
self._setup_event_client()
def _setup_event_client(self):
self.ec.close_connection()
raise
- def connect(self):
- self.ec.connect()
-
- def close_connection(self):
- self.ec.close_connection()
-
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
self.ec.subscribe(f, last_log_id)
self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
- self.is_closed = True
+ self.is_closed.set()
self.ec.close(code, reason, timeout)
def on_event(self, m):
thread.interrupt_main()
def on_closed(self):
- if self.is_closed == False:
+ if not self.is_closed.is_set():
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
_logger.warn("Error '%s' during websocket reconnect.", e)
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
- self.is_closed = True
+ self.is_closed.set()
thread.interrupt_main()
return
+ def run_forever(self):
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
# close (im)properly
if close_unexpected:
- self.ws.close_connection()
+ self.ws.ec.close_connection()
else:
self.ws.close()
client.unsubscribe(filters[:])
websocket_client().unsubscribe.assert_called_with(filters)
- @unittest.expectedFailure
@mock.patch('arvados.events._EventClient')
- def test_run_forever(self, websocket_client):
+ def test_run_forever_survives_reconnects(self, websocket_client):
+ connection_cond = threading.Condition()
+ def ws_connect():
+ with connection_cond:
+ connection_cond.notify_all()
+ websocket_client().connect.side_effect = ws_connect
client = arvados.events.EventClient(
self.MOCK_WS_URL, [], lambda event: None, None)
- client.run_forever()
- websocket_client().run_forever.assert_called_with()
+ with connection_cond:
+ forever_thread = threading.Thread(target=client.run_forever)
+ forever_thread.start()
+ # Simulate an unexpected disconnect, and wait for reconnect.
+ close_thread = threading.Thread(target=client.on_closed)
+ close_thread.start()
+ connection_cond.wait()
+ close_thread.join()
+ run_forever_alive = forever_thread.is_alive()
+ client.close()
+ forever_thread.join()
+ self.assertTrue(run_forever_alive)
+ self.assertEqual(2, websocket_client().connect.call_count)
class PollClientTestCase(unittest.TestCase):