Merge branch '9135-eventclient-run-forever-wip'
[arvados.git] / sdk / python / tests / test_events.py
index c637efe3422a46a8f3aa51c2001d41cefb023d3c..f2cdba28c775a523bc178052644cb4a76dac2771 100644 (file)
@@ -1,14 +1,9 @@
 import arvados
-import arvados.events
-import arvados.errors
-from datetime import datetime, timedelta, tzinfo
+import io
 import logging
-import logging.handlers
 import mock
 import Queue
 import run_test_server
-import StringIO
-import tempfile
 import threading
 import time
 import unittest
@@ -156,7 +151,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         run_test_server.authorize_with('active')
         events = Queue.Queue(100)
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)
@@ -181,7 +176,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # close (im)properly
         if close_unexpected:
-            self.ws.close_connection()
+            self.ws.ec.close_connection()
         else:
             self.ws.close()
 
@@ -224,7 +219,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     def test_websocket_reconnect_retry(self, event_client_connect):
         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)
@@ -265,13 +260,28 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         client.unsubscribe(filters[:])
         websocket_client().unsubscribe.assert_called_with(filters)
 
-    @unittest.expectedFailure
     @mock.patch('arvados.events._EventClient')
-    def test_run_forever(self, websocket_client):
+    def test_run_forever_survives_reconnects(self, websocket_client):
+        connection_cond = threading.Condition()
+        def ws_connect():
+            with connection_cond:
+                connection_cond.notify_all()
+        websocket_client().connect.side_effect = ws_connect
         client = arvados.events.EventClient(
             self.MOCK_WS_URL, [], lambda event: None, None)
-        client.run_forever()
-        websocket_client().run_forever.assert_called_with()
+        with connection_cond:
+            forever_thread = threading.Thread(target=client.run_forever)
+            forever_thread.start()
+            # Simulate an unexpected disconnect, and wait for reconnect.
+            close_thread = threading.Thread(target=client.on_closed)
+            close_thread.start()
+            connection_cond.wait()
+        close_thread.join()
+        run_forever_alive = forever_thread.is_alive()
+        client.close()
+        forever_thread.join()
+        self.assertTrue(run_forever_alive)
+        self.assertEqual(2, websocket_client().connect.call_count)
 
 
 class PollClientTestCase(unittest.TestCase):