X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/adedc9a3791fbed1d08da4d8681f1d0e0b36e024..d600f6dbb82e46922b17ff93a877a76131c26ebc:/sdk/python/tests/test_events.py diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py index f5192160f3..b4e6a0b1cd 100644 --- a/sdk/python/tests/test_events.py +++ b/sdk/python/tests/test_events.py @@ -2,13 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import print_function -from __future__ import absolute_import -from __future__ import division -from future import standard_library -standard_library.install_aliases() -from builtins import range -from builtins import object +import json import logging import mock import queue @@ -17,10 +11,62 @@ import threading import time import unittest +import websockets.exceptions as ws_exc + import arvados from . import arvados_testutil as tutil from . import run_test_server +class FakeWebsocketClient: + """Fake self-contained version of websockets.sync.client.ClientConnection + + This provides enough of the API to test EventClient. It loosely mimics + the Arvados WebSocket API by acknowledging subscribe messages. You can use + `mock_wrapper` to test calls. You can set `_check_lock` to test that the + given lock is acquired before `send` is called. + """ + + def __init__(self): + self._check_lock = None + self._closed = threading.Event() + self._messages = queue.Queue() + + def mock_wrapper(self): + wrapper = mock.Mock(wraps=self) + wrapper.__iter__ = lambda _: self.__iter__() + return wrapper + + def __iter__(self): + while True: + msg = self._messages.get() + self._messages.task_done() + if isinstance(msg, Exception): + raise msg + else: + yield msg + + def close(self, code=1000, reason=''): + if not self._closed.is_set(): + self._closed.set() + self.force_disconnect() + + def force_disconnect(self): + self._messages.put(ws_exc.ConnectionClosed(None, None)) + + def send(self, msg): + if self._check_lock is not None and self._check_lock.acquire(blocking=False): + self._check_lock.release() + raise AssertionError(f"called ws_client.send() without lock") + elif self._closed.is_set(): + raise ws_exc.ConnectionClosed(None, None) + try: + msg = json.loads(msg) + except ValueError: + status = 400 + else: + status = 200 + self._messages.put(json.dumps({'status': status})) + class WebsocketTest(run_test_server.TestCaseWithServers): MAIN_SERVER = {} @@ -201,7 +247,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers): # close (im)properly if close_unexpected: - self.ws.ec.close_connection() + self.ws._client.close() else: self.ws.close() @@ -240,69 +286,115 @@ class WebsocketTest(run_test_server.TestCaseWithServers): self._test_websocket_reconnect(False) # Test websocket reconnection retry - @mock.patch('arvados.events._EventClient.connect') - def test_websocket_reconnect_retry(self, event_client_connect): - event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None] - + @mock.patch('arvados.events.ws_client.connect') + def test_websocket_reconnect_retry(self, ws_conn): logstream = tutil.StringIO() rootLogger = logging.getLogger() streamHandler = logging.StreamHandler(logstream) rootLogger.addHandler(streamHandler) - - run_test_server.authorize_with('active') - events = queue.Queue(100) - - filters = [['object_uuid', 'is_a', 'arvados#human']] - self.ws = arvados.events.subscribe( - arvados.api('v1'), filters, - events.put_nowait, - poll_fallback=False, - last_log_id=None) - self.assertIsInstance(self.ws, arvados.events.EventClient) - - # simulate improper close - self.ws.on_closed() - - # verify log messages to ensure retry happened - log_messages = logstream.getvalue() - found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.") - self.assertNotEqual(found, -1) - rootLogger.removeHandler(streamHandler) - - @mock.patch('arvados.events._EventClient') - def test_subscribe_method(self, websocket_client): - filters = [['object_uuid', 'is_a', 'arvados#human']] - client = arvados.events.EventClient( - self.MOCK_WS_URL, [], lambda event: None, None) - client.subscribe(filters[:], 99) - websocket_client().subscribe.assert_called_with(filters, 99) - - @mock.patch('arvados.events._EventClient') - def test_unsubscribe(self, websocket_client): - filters = [['object_uuid', 'is_a', 'arvados#human']] - client = arvados.events.EventClient( - self.MOCK_WS_URL, filters[:], lambda event: None, None) - client.unsubscribe(filters[:]) - websocket_client().unsubscribe.assert_called_with(filters) - - @mock.patch('arvados.events._EventClient') + try: + msg_event, wss_client, self.ws = self.fake_client(ws_conn) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + ws_conn.side_effect = [Exception('EventClient.connect error'), wss_client] + wss_client.force_disconnect() + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for reconnect callback") + # verify log messages to ensure retry happened + self.assertIn("Error 'EventClient.connect error' during websocket reconnect.", logstream.getvalue()) + self.assertEqual(ws_conn.call_count, 3) + finally: + rootLogger.removeHandler(streamHandler) + + @mock.patch('arvados.events.ws_client.connect') def test_run_forever_survives_reconnects(self, websocket_client): - connected = threading.Event() - websocket_client().connect.side_effect = connected.set client = arvados.events.EventClient( self.MOCK_WS_URL, [], lambda event: None, None) forever_thread = threading.Thread(target=client.run_forever) forever_thread.start() # Simulate an unexpected disconnect, and wait for reconnect. - close_thread = threading.Thread(target=client.on_closed) - close_thread.start() - self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT)) - close_thread.join() - run_forever_alive = forever_thread.is_alive() - client.close() - forever_thread.join() - self.assertTrue(run_forever_alive) - self.assertEqual(2, websocket_client().connect.call_count) + try: + client.on_closed() + self.assertTrue(forever_thread.is_alive()) + self.assertEqual(2, websocket_client.call_count) + finally: + client.close() + forever_thread.join() + + @staticmethod + def fake_client(conn_patch, filters=None, url=MOCK_WS_URL): + """Set up EventClient test infrastructure + + Given a patch of `arvados.events.ws_client.connect`, + this returns a 3-tuple: + + * `msg_event` is a `threading.Event` that is set as the test client + event callback. You can wait for this event to confirm that a + sent message has been acknowledged and processed. + + * `mock_client` is a `mock.Mock` wrapper around `FakeWebsocketClient`. + Use this to assert `EventClient` calls the right methods. It tests + that `EventClient` acquires a lock before calling `send`. + + * `client` is the `EventClient` that uses `mock_client` under the hood + that you exercise methods of. + + Other arguments are passed to initialize `EventClient`. + """ + msg_event = threading.Event() + fake_client = FakeWebsocketClient() + mock_client = fake_client.mock_wrapper() + conn_patch.return_value = mock_client + client = arvados.events.EventClient(url, filters, lambda _: msg_event.set()) + fake_client._check_lock = client._subscribe_lock + return msg_event, mock_client, client + + @mock.patch('arvados.events.ws_client.connect') + def test_subscribe_locking(self, ws_conn): + f = [['created_at', '>=', '2023-12-01T00:00:00.000Z']] + msg_event, wss_client, self.ws = self.fake_client(ws_conn) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + wss_client.send.reset_mock() + self.ws.subscribe(f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for subscribe callback") + wss_client.send.assert_called() + (msg,), _ = wss_client.send.call_args + self.assertEqual( + json.loads(msg), + {'method': 'subscribe', 'filters': f}, + ) + + @mock.patch('arvados.events.ws_client.connect') + def test_unsubscribe_locking(self, ws_conn): + f = [['created_at', '>=', '2023-12-01T01:00:00.000Z']] + msg_event, wss_client, self.ws = self.fake_client(ws_conn, f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + wss_client.send.reset_mock() + self.ws.unsubscribe(f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for unsubscribe callback") + wss_client.send.assert_called() + (msg,), _ = wss_client.send.call_args + self.assertEqual( + json.loads(msg), + {'method': 'unsubscribe', 'filters': f}, + ) + + @mock.patch('arvados.events.ws_client.connect') + def test_resubscribe_locking(self, ws_conn): + f = [['created_at', '>=', '2023-12-01T02:00:00.000Z']] + msg_event, wss_client, self.ws = self.fake_client(ws_conn, f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + wss_client.send.reset_mock() + wss_client.force_disconnect() + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for resubscribe callback") + wss_client.send.assert_called() + (msg,), _ = wss_client.send.call_args + self.assertEqual( + json.loads(msg), + {'method': 'subscribe', 'filters': f}, + ) class PollClientTestCase(unittest.TestCase):