9135: Make EventClient initialization more consistent. 9135-restore-eventclient-api-wip
authorBrett Smith <brett@curoverse.com>
Thu, 5 May 2016 21:25:39 +0000 (17:25 -0400)
committerBrett Smith <brett@curoverse.com>
Thu, 5 May 2016 21:25:39 +0000 (17:25 -0400)
* DRY up the setup code.
* Make the client a daemon thread, for consistency with PollClient.
* Always try to close the connection on failure.  We do this in
  _subscribe_websocket, so why not do it on reconnect as well?
* Remove the public connect() and close_connection() methods from
  EventClient.  They're no longer needed, and avoid leaking
  implementation details.

sdk/python/arvados/events.py
sdk/python/tests/test_events.py

index 8f7ae699cde7b8b9d369911ad5ffac33502f34da..96d784046e3a8c6eb77b1a52f75b3d0b6c29d665 100644 (file)
@@ -84,7 +84,17 @@ class EventClient(object):
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
         self.is_closed = False
-        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+        self._setup_event_client()
+
+    def _setup_event_client(self):
+        self.ec = _EventClient(self.url, self.filters, self.on_event,
+                               self.last_log_id, self.on_closed)
+        self.ec.daemon = True
+        try:
+            self.ec.connect()
+        except Exception:
+            self.ec.close_connection()
+            raise
 
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
@@ -112,8 +122,7 @@ class EventClient(object):
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
-                    self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-                    self.ec.connect()
+                    self._setup_event_client()
                     break
                 except Exception as e:
                     _logger.warn("Error '%s' during websocket reconnect.", e)
@@ -123,10 +132,8 @@ class EventClient(object):
                 thread.interrupt_main()
                 return
 
-    def _delegate_to_ec(attr_name):
-        return property(lambda self: getattr(self.ec, attr_name))
-    for _method_name in ['connect', 'close_connection', 'run_forever']:
-        locals()[_method_name] = _delegate_to_ec(_method_name)
+    def run_forever(self):
+        return self.ec.run_forever()
 
 
 class PollClient(threading.Thread):
@@ -249,18 +256,10 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     if not endpoint:
         raise errors.FeatureNotEnabledError(
             "Server does not advertise a websocket endpoint")
+    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
     try:
-        uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
-        ok = False
-        try:
-            client.connect()
-            ok = True
-            return client
-        finally:
-            if not ok:
-                client.close_connection()
-    except:
+    except Exception:
         _logger.warn("Failed to connect to websockets on %s" % endpoint)
         raise
 
index 81beba59b7d61c13e13ac983e7d5e8639550b748..23bd024adc8466f100363b2d725768bc49a59723 100644 (file)
@@ -26,6 +26,14 @@ class EventClientTestCase(unittest.TestCase):
             yield arvados.events.EventClient(TEST_WS_URL, filters, callback,
                                              last_log_id), ws_mock
 
+    def test_client_init(self):
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            ws_mock.assert_called_with(TEST_WS_URL, [[]], client.on_event, None,
+                                       client.on_closed)
+            ws_mock().connect.assert_called()
+            self.assertIs(ws_mock().daemon, True)
+
     def test_subscribe_calls_ws(self):
         ws_filter = ['kind', '=', 'arvados#test']
         with self.mocked_client() as client_tuple:
@@ -41,19 +49,10 @@ class EventClientTestCase(unittest.TestCase):
             client.unsubscribe(ws_filter)
             ws_mock().unsubscribe.assert_called_with(ws_filter)
 
-    # PollClient doesn't have this method, but for now you have to call it
-    # for anything to work.
-    def test_connect_calls_ws(self):
-        with self.mocked_client() as client_tuple:
-            client, ws_mock = client_tuple
-            client.connect()
-            ws_mock().connect.assert_called()
-
     def test_close_calls_ws(self):
         with self.mocked_client() as client_tuple:
             client, ws_mock = client_tuple
             ws_mock().close.side_effect = lambda *args: client.on_closed()
-            client.connect()
             client.close()
             ws_mock().close.assert_called()
             # Check on_closed did not try to reconnect.
@@ -62,6 +61,12 @@ class EventClientTestCase(unittest.TestCase):
     def test_run_forever_calls_ws(self):
         with self.mocked_client() as client_tuple:
             client, ws_mock = client_tuple
-            client.connect()
             client.run_forever()
             ws_mock().run_forever.assert_called()
+
+    def test_reconnect_rebuilds_and_reconnects_underlying_client(self):
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            client.on_closed()
+            self.assertEqual(2, ws_mock.call_count)
+            self.assertEqual(2, ws_mock().connect.call_count)