self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
self.is_closed = False
- self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+ self._setup_event_client()
+
+ def _setup_event_client(self):
+ self.ec = _EventClient(self.url, self.filters, self.on_event,
+ self.last_log_id, self.on_closed)
+ self.ec.daemon = True
+ try:
+ self.ec.connect()
+ except Exception:
+ self.ec.close_connection()
+ raise
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
- self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- self.ec.connect()
+ self._setup_event_client()
break
except Exception as e:
_logger.warn("Error '%s' during websocket reconnect.", e)
thread.interrupt_main()
return
- def _delegate_to_ec(attr_name):
- return property(lambda self: getattr(self.ec, attr_name))
- for _method_name in ['connect', 'close_connection', 'run_forever']:
- locals()[_method_name] = _delegate_to_ec(_method_name)
+ def run_forever(self):
+ return self.ec.run_forever()
class PollClient(threading.Thread):
if not endpoint:
raise errors.FeatureNotEnabledError(
"Server does not advertise a websocket endpoint")
+ uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
try:
- uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
client = EventClient(uri_with_token, filters, on_event, last_log_id)
- ok = False
- try:
- client.connect()
- ok = True
- return client
- finally:
- if not ok:
- client.close_connection()
- except:
+ except Exception:
_logger.warn("Failed to connect to websockets on %s" % endpoint)
raise
yield arvados.events.EventClient(TEST_WS_URL, filters, callback,
last_log_id), ws_mock
+ def test_client_init(self):
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ ws_mock.assert_called_with(TEST_WS_URL, [[]], client.on_event, None,
+ client.on_closed)
+ ws_mock().connect.assert_called()
+ self.assertIs(ws_mock().daemon, True)
+
def test_subscribe_calls_ws(self):
ws_filter = ['kind', '=', 'arvados#test']
with self.mocked_client() as client_tuple:
client.unsubscribe(ws_filter)
ws_mock().unsubscribe.assert_called_with(ws_filter)
- # PollClient doesn't have this method, but for now you have to call it
- # for anything to work.
- def test_connect_calls_ws(self):
- with self.mocked_client() as client_tuple:
- client, ws_mock = client_tuple
- client.connect()
- ws_mock().connect.assert_called()
-
def test_close_calls_ws(self):
with self.mocked_client() as client_tuple:
client, ws_mock = client_tuple
ws_mock().close.side_effect = lambda *args: client.on_closed()
- client.connect()
client.close()
ws_mock().close.assert_called()
# Check on_closed did not try to reconnect.
def test_run_forever_calls_ws(self):
with self.mocked_client() as client_tuple:
client, ws_mock = client_tuple
- client.connect()
client.run_forever()
ws_mock().run_forever.assert_called()
+
+ def test_reconnect_rebuilds_and_reconnects_underlying_client(self):
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ client.on_closed()
+ self.assertEqual(2, ws_mock.call_count)
+ self.assertEqual(2, ws_mock().connect.call_count)