8460: Merge branch 'master' into 8460-websocket-go
[arvados.git] / sdk / python / tests / test_events.py
index 7cc35f96e0e1dc358923c5bb239a647f1d841471..7e8c84ec11279495d55fd47770378847886ae76e 100644 (file)
@@ -17,6 +17,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     TIME_FUTURE = time.time()+3600
     MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
 
+    TEST_TIMEOUT = 10.0
+
     def setUp(self):
         self.ws = None
 
@@ -51,21 +53,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self.assertEqual(200, events.get(True, 5)['status'])
         human = arvados.api('v1').humans().create(body={}).execute()
 
-        log_object_uuids = []
-        for i in range(0, expected):
-            log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+        want_uuids = []
         if expected > 0:
-            self.assertIn(human['uuid'], log_object_uuids)
-
+            want_uuids.append(human['uuid'])
         if expected > 1:
-            self.assertIn(ancestor['uuid'], log_object_uuids)
+            want_uuids.append(ancestor['uuid'])
+        log_object_uuids = []
+        while set(want_uuids) - set(log_object_uuids):
+            log_object_uuids.append(events.get(True, 5)['object_uuid'])
 
-        with self.assertRaises(Queue.Empty):
-            # assertEqual just serves to show us what unexpected thing
-            # comes out of the queue when the assertRaises fails; when
-            # the test passes, this assertEqual doesn't get called.
-            self.assertEqual(events.get(True, 2), None)
+        if expected < 2:
+            with self.assertRaises(Queue.Empty):
+                # assertEqual just serves to show us what unexpected
+                # thing comes out of the queue when the assertRaises
+                # fails; when the test passes, this assertEqual
+                # doesn't get called.
+                self.assertEqual(events.get(True, 2), None)
 
     def test_subscribe_websocket(self):
         self._test_subscribe(
@@ -262,20 +265,16 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
     @mock.patch('arvados.events._EventClient')
     def test_run_forever_survives_reconnects(self, websocket_client):
-        connection_cond = threading.Condition()
-        def ws_connect():
-            with connection_cond:
-                connection_cond.notify_all()
-        websocket_client().connect.side_effect = ws_connect
+        connected = threading.Event()
+        websocket_client().connect.side_effect = connected.set
         client = arvados.events.EventClient(
             self.MOCK_WS_URL, [], lambda event: None, None)
-        with connection_cond:
-            forever_thread = threading.Thread(target=client.run_forever)
-            forever_thread.start()
-            # Simulate an unexpected disconnect, and wait for reconnect.
-            close_thread = threading.Thread(target=client.on_closed)
-            close_thread.start()
-            connection_cond.wait()
+        forever_thread = threading.Thread(target=client.run_forever)
+        forever_thread.start()
+        # Simulate an unexpected disconnect, and wait for reconnect.
+        close_thread = threading.Thread(target=client.on_closed)
+        close_thread.start()
+        self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
         close_thread.join()
         run_forever_alive = forever_thread.is_alive()
         client.close()
@@ -285,7 +284,10 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
 
 class PollClientTestCase(unittest.TestCase):
+    TEST_TIMEOUT = 10.0
+
     class MockLogs(object):
+
         def __init__(self):
             self.logs = []
             self.lock = threading.Lock()
@@ -300,12 +302,11 @@ class PollClientTestCase(unittest.TestCase):
                 self.logs = []
             return {'items': retval, 'items_available': len(retval)}
 
-
     def setUp(self):
         self.logs = self.MockLogs()
         self.arv = mock.MagicMock(name='arvados.api()')
         self.arv.logs().list().execute.side_effect = self.logs.return_list
-        self.callback_cond = threading.Condition()
+        self.callback_called = threading.Event()
         self.recv_events = []
 
     def tearDown(self):
@@ -313,9 +314,8 @@ class PollClientTestCase(unittest.TestCase):
             self.client.close(timeout=None)
 
     def callback(self, event):
-        with self.callback_cond:
-            self.recv_events.append(event)
-            self.callback_cond.notify_all()
+        self.recv_events.append(event)
+        self.callback_called.set()
 
     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
         if filters is None:
@@ -333,11 +333,11 @@ class PollClientTestCase(unittest.TestCase):
         test_log = {'id': 12345, 'testkey': 'testtext'}
         self.logs.add({'id': 123})
         self.build_client(poll_time=.01)
-        with self.callback_cond:
-            self.client.start()
-            self.callback_cond.wait()
-            self.logs.add(test_log.copy())
-            self.callback_cond.wait()
+        self.client.start()
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
+        self.callback_called.clear()
+        self.logs.add(test_log.copy())
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
         self.client.close(timeout=None)
         self.assertIn(test_log, self.recv_events)
 
@@ -345,9 +345,8 @@ class PollClientTestCase(unittest.TestCase):
         client_filter = ['kind', '=', 'arvados#test']
         self.build_client()
         self.client.subscribe([client_filter[:]])
-        with self.callback_cond:
-            self.client.start()
-            self.callback_cond.wait()
+        self.client.start()
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
         self.client.close(timeout=None)
         self.assertTrue(self.was_filter_used(client_filter))
 
@@ -362,11 +361,10 @@ class PollClientTestCase(unittest.TestCase):
 
     def test_run_forever(self):
         self.build_client()
-        with self.callback_cond:
-            self.client.start()
-            forever_thread = threading.Thread(target=self.client.run_forever)
-            forever_thread.start()
-            self.callback_cond.wait()
+        self.client.start()
+        forever_thread = threading.Thread(target=self.client.run_forever)
+        forever_thread.start()
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
         self.assertTrue(forever_thread.is_alive())
         self.client.close()
         forever_thread.join()