X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7fde9a0a090e5f092b946f7569f1350778a47574..HEAD:/sdk/python/tests/test_events.py diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py index 4596b6cdfb..c5f3310015 100644 --- a/sdk/python/tests/test_events.py +++ b/sdk/python/tests/test_events.py @@ -1,28 +1,80 @@ -from __future__ import print_function -from __future__ import absolute_import -from __future__ import division -from future import standard_library -standard_library.install_aliases() -from builtins import range -from builtins import object -import arvados -import io +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import json import logging -import mock import queue -from . import run_test_server +import sys import threading import time import unittest -from . import arvados_testutil +from unittest import mock + +import websockets.exceptions as ws_exc + +import arvados +from . import arvados_testutil as tutil +from . import run_test_server + +class FakeWebsocketClient: + """Fake self-contained version of websockets.sync.client.ClientConnection + + This provides enough of the API to test EventClient. It loosely mimics + the Arvados WebSocket API by acknowledging subscribe messages. You can use + `mock_wrapper` to test calls. You can set `_check_lock` to test that the + given lock is acquired before `send` is called. + """ + + def __init__(self): + self._check_lock = None + self._closed = threading.Event() + self._messages = queue.Queue() + + def mock_wrapper(self): + wrapper = mock.Mock(wraps=self) + wrapper.__iter__ = lambda _: self.__iter__() + return wrapper + + def __iter__(self): + while True: + msg = self._messages.get() + self._messages.task_done() + if isinstance(msg, Exception): + raise msg + else: + yield msg + + def close(self, code=1000, reason=''): + if not self._closed.is_set(): + self._closed.set() + self.force_disconnect() + + def force_disconnect(self): + self._messages.put(ws_exc.ConnectionClosed(None, None)) + + def send(self, msg): + if self._check_lock is not None and self._check_lock.acquire(blocking=False): + self._check_lock.release() + raise AssertionError(f"called ws_client.send() without lock") + elif self._closed.is_set(): + raise ws_exc.ConnectionClosed(None, None) + try: + msg = json.loads(msg) + except ValueError: + status = 400 + else: + status = 200 + self._messages.put(json.dumps({'status': status})) + class WebsocketTest(run_test_server.TestCaseWithServers): MAIN_SERVER = {} TIME_PAST = time.time()-3600 TIME_FUTURE = time.time()+3600 - MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST) + MOCK_WS_URL = 'wss://[{}]/'.format(tutil.TEST_HOST) TEST_TIMEOUT = 10.0 @@ -34,7 +86,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers): if self.ws: self.ws.close() except Exception as e: - print(("Error in teardown: ", e)) + print("Error in teardown: ", e) super(WebsocketTest, self).tearDown() run_test_server.reset() @@ -45,9 +97,9 @@ class WebsocketTest(run_test_server.TestCaseWithServers): # Create ancestor before subscribing. # When listening with start_time in the past, this should also be retrieved. # However, when start_time is omitted in subscribe, this should not be fetched. - ancestor = arvados.api('v1').humans().create(body={}).execute() + ancestor = arvados.api('v1').collections().create(body={}).execute() - filters = [['object_uuid', 'is_a', 'arvados#human']] + filters = [['object_uuid', 'is_a', 'arvados#collection']] if start_time: filters.append(['created_at', '>=', start_time]) @@ -58,11 +110,19 @@ class WebsocketTest(run_test_server.TestCaseWithServers): last_log_id=(1 if start_time else None)) self.assertIsInstance(self.ws, expect_type) self.assertEqual(200, events.get(True, 5)['status']) - human = arvados.api('v1').humans().create(body={}).execute() + + if hasattr(self.ws, '_skip_old_events'): + # Avoid race by waiting for the first "find ID threshold" + # poll to finish. + deadline = time.time() + 10 + while not self.ws._skip_old_events: + self.assertLess(time.time(), deadline) + time.sleep(0.1) + collection = arvados.api('v1').collections().create(body={}).execute() want_uuids = [] if expected > 0: - want_uuids.append(human['uuid']) + want_uuids.append(collection['uuid']) if expected > 1: want_uuids.append(ancestor['uuid']) log_object_uuids = [] @@ -96,10 +156,12 @@ class WebsocketTest(run_test_server.TestCaseWithServers): error_mock = mock.MagicMock() error_mock.resp.status = 0 error_mock._get_reason.return_value = "testing" - api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""), - {"items": [{"id": 1}], "items_available": 1}, - arvados.errors.ApiError(error_mock, ""), - {"items": [{"id": 1}], "items_available": 1}) + api_mock.logs().list().execute.side_effect = ( + arvados.errors.ApiError(error_mock, b""), + {"items": [{"id": 1}], "items_available": 1}, + arvados.errors.ApiError(error_mock, b""), + {"items": [{"id": 1}], "items_available": 1}, + ) pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None) pc.start() while len(n) < 2: @@ -156,17 +218,17 @@ class WebsocketTest(run_test_server.TestCaseWithServers): """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator""" return '{:+03d}:{:02d}'.format(offset//60, offset%60) - # Test websocket reconnection on (un)execpted close + # Test websocket reconnection on (un)expected close def _test_websocket_reconnect(self, close_unexpected): run_test_server.authorize_with('active') events = queue.Queue(100) - logstream = io.BytesIO() + logstream = tutil.StringIO() rootLogger = logging.getLogger() streamHandler = logging.StreamHandler(logstream) rootLogger.addHandler(streamHandler) - filters = [['object_uuid', 'is_a', 'arvados#human']] + filters = [['object_uuid', 'is_a', 'arvados#collection']] filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)]) self.ws = arvados.events.subscribe( arvados.api('v1'), filters, @@ -177,21 +239,21 @@ class WebsocketTest(run_test_server.TestCaseWithServers): self.assertEqual(200, events.get(True, 5)['status']) # create obj - human = arvados.api('v1').humans().create(body={}).execute() + collection = arvados.api('v1').collections().create(body={}).execute() # expect an event - self.assertIn(human['uuid'], events.get(True, 5)['object_uuid']) + self.assertIn(collection['uuid'], events.get(True, 5)['object_uuid']) with self.assertRaises(queue.Empty): self.assertEqual(events.get(True, 2), None) # close (im)properly if close_unexpected: - self.ws.ec.close_connection() + self.ws._client.close() else: self.ws.close() # create one more obj - human2 = arvados.api('v1').humans().create(body={}).execute() + collection2 = arvados.api('v1').collections().create(body={}).execute() # (un)expect the object creation event if close_unexpected: @@ -202,8 +264,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers): log_object_uuids.append(event['object_uuid']) with self.assertRaises(queue.Empty): self.assertEqual(events.get(True, 2), None) - self.assertNotIn(human['uuid'], log_object_uuids) - self.assertIn(human2['uuid'], log_object_uuids) + self.assertNotIn(collection['uuid'], log_object_uuids) + self.assertIn(collection2['uuid'], log_object_uuids) else: with self.assertRaises(queue.Empty): self.assertEqual(events.get(True, 2), None) @@ -225,69 +287,115 @@ class WebsocketTest(run_test_server.TestCaseWithServers): self._test_websocket_reconnect(False) # Test websocket reconnection retry - @mock.patch('arvados.events._EventClient.connect') - def test_websocket_reconnect_retry(self, event_client_connect): - event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None] - - logstream = io.BytesIO() + @mock.patch('arvados.events.ws_client.connect') + def test_websocket_reconnect_retry(self, ws_conn): + logstream = tutil.StringIO() rootLogger = logging.getLogger() streamHandler = logging.StreamHandler(logstream) rootLogger.addHandler(streamHandler) - - run_test_server.authorize_with('active') - events = queue.Queue(100) - - filters = [['object_uuid', 'is_a', 'arvados#human']] - self.ws = arvados.events.subscribe( - arvados.api('v1'), filters, - events.put_nowait, - poll_fallback=False, - last_log_id=None) - self.assertIsInstance(self.ws, arvados.events.EventClient) - - # simulate improper close - self.ws.on_closed() - - # verify log messages to ensure retry happened - log_messages = logstream.getvalue() - found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.") - self.assertNotEqual(found, -1) - rootLogger.removeHandler(streamHandler) - - @mock.patch('arvados.events._EventClient') - def test_subscribe_method(self, websocket_client): - filters = [['object_uuid', 'is_a', 'arvados#human']] - client = arvados.events.EventClient( - self.MOCK_WS_URL, [], lambda event: None, None) - client.subscribe(filters[:], 99) - websocket_client().subscribe.assert_called_with(filters, 99) - - @mock.patch('arvados.events._EventClient') - def test_unsubscribe(self, websocket_client): - filters = [['object_uuid', 'is_a', 'arvados#human']] - client = arvados.events.EventClient( - self.MOCK_WS_URL, filters[:], lambda event: None, None) - client.unsubscribe(filters[:]) - websocket_client().unsubscribe.assert_called_with(filters) - - @mock.patch('arvados.events._EventClient') + try: + msg_event, wss_client, self.ws = self.fake_client(ws_conn) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + ws_conn.side_effect = [Exception('EventClient.connect error'), wss_client] + wss_client.force_disconnect() + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for reconnect callback") + # verify log messages to ensure retry happened + self.assertIn("Error 'EventClient.connect error' during websocket reconnect.", logstream.getvalue()) + self.assertEqual(ws_conn.call_count, 3) + finally: + rootLogger.removeHandler(streamHandler) + + @mock.patch('arvados.events.ws_client.connect') def test_run_forever_survives_reconnects(self, websocket_client): - connected = threading.Event() - websocket_client().connect.side_effect = connected.set client = arvados.events.EventClient( self.MOCK_WS_URL, [], lambda event: None, None) forever_thread = threading.Thread(target=client.run_forever) forever_thread.start() # Simulate an unexpected disconnect, and wait for reconnect. - close_thread = threading.Thread(target=client.on_closed) - close_thread.start() - self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT)) - close_thread.join() - run_forever_alive = forever_thread.is_alive() - client.close() - forever_thread.join() - self.assertTrue(run_forever_alive) - self.assertEqual(2, websocket_client().connect.call_count) + try: + client.on_closed() + self.assertTrue(forever_thread.is_alive()) + self.assertEqual(2, websocket_client.call_count) + finally: + client.close() + forever_thread.join() + + @staticmethod + def fake_client(conn_patch, filters=None, url=MOCK_WS_URL): + """Set up EventClient test infrastructure + + Given a patch of `arvados.events.ws_client.connect`, + this returns a 3-tuple: + + * `msg_event` is a `threading.Event` that is set as the test client + event callback. You can wait for this event to confirm that a + sent message has been acknowledged and processed. + + * `mock_client` is a `mock.Mock` wrapper around `FakeWebsocketClient`. + Use this to assert `EventClient` calls the right methods. It tests + that `EventClient` acquires a lock before calling `send`. + + * `client` is the `EventClient` that uses `mock_client` under the hood + that you exercise methods of. + + Other arguments are passed to initialize `EventClient`. + """ + msg_event = threading.Event() + fake_client = FakeWebsocketClient() + mock_client = fake_client.mock_wrapper() + conn_patch.return_value = mock_client + client = arvados.events.EventClient(url, filters, lambda _: msg_event.set()) + fake_client._check_lock = client._subscribe_lock + return msg_event, mock_client, client + + @mock.patch('arvados.events.ws_client.connect') + def test_subscribe_locking(self, ws_conn): + f = [['created_at', '>=', '2023-12-01T00:00:00.000Z']] + msg_event, wss_client, self.ws = self.fake_client(ws_conn) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + wss_client.send.reset_mock() + self.ws.subscribe(f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for subscribe callback") + wss_client.send.assert_called() + (msg,), _ = wss_client.send.call_args + self.assertEqual( + json.loads(msg), + {'method': 'subscribe', 'filters': f}, + ) + + @mock.patch('arvados.events.ws_client.connect') + def test_unsubscribe_locking(self, ws_conn): + f = [['created_at', '>=', '2023-12-01T01:00:00.000Z']] + msg_event, wss_client, self.ws = self.fake_client(ws_conn, f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + wss_client.send.reset_mock() + self.ws.unsubscribe(f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for unsubscribe callback") + wss_client.send.assert_called() + (msg,), _ = wss_client.send.call_args + self.assertEqual( + json.loads(msg), + {'method': 'unsubscribe', 'filters': f}, + ) + + @mock.patch('arvados.events.ws_client.connect') + def test_resubscribe_locking(self, ws_conn): + f = [['created_at', '>=', '2023-12-01T02:00:00.000Z']] + msg_event, wss_client, self.ws = self.fake_client(ws_conn, f) + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback") + msg_event.clear() + wss_client.send.reset_mock() + wss_client.force_disconnect() + self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for resubscribe callback") + wss_client.send.assert_called() + (msg,), _ = wss_client.send.call_args + self.assertEqual( + json.loads(msg), + {'method': 'subscribe', 'filters': f}, + ) class PollClientTestCase(unittest.TestCase):