9135: Bring EventClient's public interface closer to PollClient's.
authorBrett Smith <brett@curoverse.com>
Mon, 9 May 2016 16:54:23 +0000 (12:54 -0400)
committerBrett Smith <brett@curoverse.com>
Fri, 13 May 2016 14:54:37 +0000 (10:54 -0400)
* Restore the run_forever method, which was previously inherited from
  WebSocketClient.
* Remove the connect and close_connection methods, which are
  WebSocketClient implementation details that don't make sense as part
  of the public interface.  (A running EventClient will just reconnect
  if you call close_connection on it.)

sdk/python/arvados/events.py
sdk/python/tests/test_events.py

index 68cfa2187076725a51af070a0daa68d790e31f3b..81a9b36182a8545adbdcd3fd6afec7f0fba53602 100644 (file)
@@ -83,7 +83,7 @@ class EventClient(object):
             self.filters = [[]]
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
-        self.is_closed = False
+        self.is_closed = threading.Event()
         self._setup_event_client()
 
     def _setup_event_client(self):
@@ -96,12 +96,6 @@ class EventClient(object):
             self.ec.close_connection()
             raise
 
-    def connect(self):
-        self.ec.connect()
-
-    def close_connection(self):
-        self.ec.close_connection()
-
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
         self.ec.subscribe(f, last_log_id)
@@ -111,7 +105,7 @@ class EventClient(object):
         self.ec.unsubscribe(f)
 
     def close(self, code=1000, reason='', timeout=0):
-        self.is_closed = True
+        self.is_closed.set()
         self.ec.close(code, reason, timeout)
 
     def on_event(self, m):
@@ -124,7 +118,7 @@ class EventClient(object):
             thread.interrupt_main()
 
     def on_closed(self):
-        if self.is_closed == False:
+        if not self.is_closed.is_set():
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
@@ -134,10 +128,15 @@ class EventClient(object):
                     _logger.warn("Error '%s' during websocket reconnect.", e)
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
-                self.is_closed = True
+                self.is_closed.set()
                 thread.interrupt_main()
                 return
 
+    def run_forever(self):
+        # Have to poll here to let KeyboardInterrupt get raised.
+        while not self.is_closed.wait(1):
+            pass
+
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
index 7b69fa2cfb23e368082db3976b85ad2ffc9fb93a..f2cdba28c775a523bc178052644cb4a76dac2771 100644 (file)
@@ -176,7 +176,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # close (im)properly
         if close_unexpected:
-            self.ws.close_connection()
+            self.ws.ec.close_connection()
         else:
             self.ws.close()
 
@@ -260,13 +260,28 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         client.unsubscribe(filters[:])
         websocket_client().unsubscribe.assert_called_with(filters)
 
-    @unittest.expectedFailure
     @mock.patch('arvados.events._EventClient')
-    def test_run_forever(self, websocket_client):
+    def test_run_forever_survives_reconnects(self, websocket_client):
+        connection_cond = threading.Condition()
+        def ws_connect():
+            with connection_cond:
+                connection_cond.notify_all()
+        websocket_client().connect.side_effect = ws_connect
         client = arvados.events.EventClient(
             self.MOCK_WS_URL, [], lambda event: None, None)
-        client.run_forever()
-        websocket_client().run_forever.assert_called_with()
+        with connection_cond:
+            forever_thread = threading.Thread(target=client.run_forever)
+            forever_thread.start()
+            # Simulate an unexpected disconnect, and wait for reconnect.
+            close_thread = threading.Thread(target=client.on_closed)
+            close_thread.start()
+            connection_cond.wait()
+        close_thread.join()
+        run_forever_alive = forever_thread.is_alive()
+        client.close()
+        forever_thread.join()
+        self.assertTrue(run_forever_alive)
+        self.assertEqual(2, websocket_client().connect.call_count)
 
 
 class PollClientTestCase(unittest.TestCase):