X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ec38d4ae1a6597e1f35d5cd0b25f6384666c55be..f3e02106cfc33ffe333af9e303a9e68f3ecfb2e4:/sdk/python/tests/test_events.py?ds=sidebyside diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py index 7b69fa2cfb..7e8c84ec11 100644 --- a/sdk/python/tests/test_events.py +++ b/sdk/python/tests/test_events.py @@ -17,6 +17,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers): TIME_FUTURE = time.time()+3600 MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST) + TEST_TIMEOUT = 10.0 + def setUp(self): self.ws = None @@ -51,21 +53,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers): self.assertEqual(200, events.get(True, 5)['status']) human = arvados.api('v1').humans().create(body={}).execute() - log_object_uuids = [] - for i in range(0, expected): - log_object_uuids.append(events.get(True, 5)['object_uuid']) - + want_uuids = [] if expected > 0: - self.assertIn(human['uuid'], log_object_uuids) - + want_uuids.append(human['uuid']) if expected > 1: - self.assertIn(ancestor['uuid'], log_object_uuids) + want_uuids.append(ancestor['uuid']) + log_object_uuids = [] + while set(want_uuids) - set(log_object_uuids): + log_object_uuids.append(events.get(True, 5)['object_uuid']) - with self.assertRaises(Queue.Empty): - # assertEqual just serves to show us what unexpected thing - # comes out of the queue when the assertRaises fails; when - # the test passes, this assertEqual doesn't get called. - self.assertEqual(events.get(True, 2), None) + if expected < 2: + with self.assertRaises(Queue.Empty): + # assertEqual just serves to show us what unexpected + # thing comes out of the queue when the assertRaises + # fails; when the test passes, this assertEqual + # doesn't get called. + self.assertEqual(events.get(True, 2), None) def test_subscribe_websocket(self): self._test_subscribe( @@ -143,8 +146,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers): return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60) def isotz(self, offset): - """Convert minutes-east-of-UTC to ISO8601 time zone designator""" - return '{:+03d}{:02d}'.format(offset/60, offset%60) + """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator""" + return '{:+03d}:{:02d}'.format(offset/60, offset%60) # Test websocket reconnection on (un)execpted close def _test_websocket_reconnect(self, close_unexpected): @@ -176,7 +179,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers): # close (im)properly if close_unexpected: - self.ws.close_connection() + self.ws.ec.close_connection() else: self.ws.close() @@ -260,17 +263,31 @@ class WebsocketTest(run_test_server.TestCaseWithServers): client.unsubscribe(filters[:]) websocket_client().unsubscribe.assert_called_with(filters) - @unittest.expectedFailure @mock.patch('arvados.events._EventClient') - def test_run_forever(self, websocket_client): + def test_run_forever_survives_reconnects(self, websocket_client): + connected = threading.Event() + websocket_client().connect.side_effect = connected.set client = arvados.events.EventClient( self.MOCK_WS_URL, [], lambda event: None, None) - client.run_forever() - websocket_client().run_forever.assert_called_with() + forever_thread = threading.Thread(target=client.run_forever) + forever_thread.start() + # Simulate an unexpected disconnect, and wait for reconnect. + close_thread = threading.Thread(target=client.on_closed) + close_thread.start() + self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT)) + close_thread.join() + run_forever_alive = forever_thread.is_alive() + client.close() + forever_thread.join() + self.assertTrue(run_forever_alive) + self.assertEqual(2, websocket_client().connect.call_count) class PollClientTestCase(unittest.TestCase): + TEST_TIMEOUT = 10.0 + class MockLogs(object): + def __init__(self): self.logs = [] self.lock = threading.Lock() @@ -285,12 +302,11 @@ class PollClientTestCase(unittest.TestCase): self.logs = [] return {'items': retval, 'items_available': len(retval)} - def setUp(self): self.logs = self.MockLogs() self.arv = mock.MagicMock(name='arvados.api()') self.arv.logs().list().execute.side_effect = self.logs.return_list - self.callback_cond = threading.Condition() + self.callback_called = threading.Event() self.recv_events = [] def tearDown(self): @@ -298,9 +314,8 @@ class PollClientTestCase(unittest.TestCase): self.client.close(timeout=None) def callback(self, event): - with self.callback_cond: - self.recv_events.append(event) - self.callback_cond.notify_all() + self.recv_events.append(event) + self.callback_called.set() def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99): if filters is None: @@ -318,11 +333,11 @@ class PollClientTestCase(unittest.TestCase): test_log = {'id': 12345, 'testkey': 'testtext'} self.logs.add({'id': 123}) self.build_client(poll_time=.01) - with self.callback_cond: - self.client.start() - self.callback_cond.wait() - self.logs.add(test_log.copy()) - self.callback_cond.wait() + self.client.start() + self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT)) + self.callback_called.clear() + self.logs.add(test_log.copy()) + self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT)) self.client.close(timeout=None) self.assertIn(test_log, self.recv_events) @@ -330,9 +345,8 @@ class PollClientTestCase(unittest.TestCase): client_filter = ['kind', '=', 'arvados#test'] self.build_client() self.client.subscribe([client_filter[:]]) - with self.callback_cond: - self.client.start() - self.callback_cond.wait() + self.client.start() + self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT)) self.client.close(timeout=None) self.assertTrue(self.was_filter_used(client_filter)) @@ -347,11 +361,10 @@ class PollClientTestCase(unittest.TestCase): def test_run_forever(self): self.build_client() - with self.callback_cond: - self.client.start() - forever_thread = threading.Thread(target=self.client.run_forever) - forever_thread.start() - self.callback_cond.wait() + self.client.start() + forever_thread = threading.Thread(target=self.client.run_forever) + forever_thread.start() + self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT)) self.assertTrue(forever_thread.is_alive()) self.client.close() forever_thread.join()