X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/970907f28866a09a9fe95da48dffa6cd34ab4dca..4ab126d1574b7db2fdb5b0cea253b2df28d7b130:/sdk/python/tests/test_websockets.py diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py index 37b644aaf1..907dd93100 100644 --- a/sdk/python/tests/test_websockets.py +++ b/sdk/python/tests/test_websockets.py @@ -1,9 +1,13 @@ import arvados import arvados.events from datetime import datetime, timedelta, tzinfo +import logging +import logging.handlers import mock import Queue import run_test_server +import StringIO +import tempfile import threading import time import unittest @@ -18,8 +22,11 @@ class WebsocketTest(run_test_server.TestCaseWithServers): self.ws = None def tearDown(self): - if self.ws: - self.ws.close() + try: + if self.ws: + self.ws.close() + except Exception as e: + print("Error in teardown: ", e) super(WebsocketTest, self).tearDown() run_test_server.reset() @@ -120,3 +127,101 @@ class WebsocketTest(run_test_server.TestCaseWithServers): def isotz(self, offset): """Convert minutes-east-of-UTC to ISO8601 time zone designator""" return '{:+03d}{:02d}'.format(offset/60, offset%60) + + # Test websocket reconnection on (un)execpted close + def _test_websocket_reconnect(self, close_unexpected): + run_test_server.authorize_with('active') + events = Queue.Queue(100) + + logstream = StringIO.StringIO() + rootLogger = logging.getLogger() + streamHandler = logging.StreamHandler(logstream) + rootLogger.addHandler(streamHandler) + + filters = [['object_uuid', 'is_a', 'arvados#human']] + filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)]) + self.ws = arvados.events.subscribe( + arvados.api('v1'), filters, + events.put_nowait, + poll_fallback=False, + last_log_id=None) + self.assertIsInstance(self.ws, arvados.events.EventClient) + self.assertEqual(200, events.get(True, 5)['status']) + + # create obj + human = arvados.api('v1').humans().create(body={}).execute() + + # expect an event + self.assertIn(human['uuid'], events.get(True, 5)['object_uuid']) + with self.assertRaises(Queue.Empty): + self.assertEqual(events.get(True, 2), None) + + # close (im)properly + if close_unexpected: + self.ws.close_connection() + else: + self.ws.close() + + # create one more obj + human2 = arvados.api('v1').humans().create(body={}).execute() + + # (un)expect the object creation event + if close_unexpected: + log_object_uuids = [] + for i in range(0, 2): + event = events.get(True, 5) + if event.get('object_uuid') != None: + log_object_uuids.append(event['object_uuid']) + with self.assertRaises(Queue.Empty): + self.assertEqual(events.get(True, 2), None) + self.assertNotIn(human['uuid'], log_object_uuids) + self.assertIn(human2['uuid'], log_object_uuids) + else: + with self.assertRaises(Queue.Empty): + self.assertEqual(events.get(True, 2), None) + + # verify log message to ensure that an (un)expected close + log_messages = logstream.getvalue() + closeLogFound = log_messages.find("Unexpected close. Reconnecting.") + retryLogFound = log_messages.find("Error during websocket reconnect. Will retry") + if close_unexpected: + self.assertNotEqual(closeLogFound, -1) + else: + self.assertEqual(closeLogFound, -1) + rootLogger.removeHandler(streamHandler) + + def test_websocket_reconnect_on_unexpected_close(self): + self._test_websocket_reconnect(True) + + def test_websocket_no_reconnect_on_close_by_user(self): + self._test_websocket_reconnect(False) + + # Test websocket reconnection retry + @mock.patch('arvados.events._EventClient.connect') + def test_websocket_reconnect_retry(self, event_client_connect): + event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None] + + logstream = StringIO.StringIO() + rootLogger = logging.getLogger() + streamHandler = logging.StreamHandler(logstream) + rootLogger.addHandler(streamHandler) + + run_test_server.authorize_with('active') + events = Queue.Queue(100) + + filters = [['object_uuid', 'is_a', 'arvados#human']] + self.ws = arvados.events.subscribe( + arvados.api('v1'), filters, + events.put_nowait, + poll_fallback=False, + last_log_id=None) + self.assertIsInstance(self.ws, arvados.events.EventClient) + + # simulate improper close + self.ws.on_closed() + + # verify log messages to ensure retry happened + log_messages = logstream.getvalue() + found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect. Will retry") + self.assertNotEqual(found, -1) + rootLogger.removeHandler(streamHandler)