7658: update EventClient.on_closed to retry on connect errors.
authorradhika <radhika@curoverse.com>
Tue, 5 Apr 2016 16:00:19 +0000 (12:00 -0400)
committerradhika <radhika@curoverse.com>
Tue, 5 Apr 2016 16:00:19 +0000 (12:00 -0400)
sdk/python/arvados/events.py
sdk/python/tests/test_websockets.py

index fcc3feb00d8a15a16aca9ca1d5b9ba57c81ae580..57bf3ae1cb56406dd59400d8bbb7bd9b972f6641 100644 (file)
@@ -15,7 +15,7 @@ _logger = logging.getLogger('arvados.events')
 
 
 class _EventClient(WebSocketClient):
-    def __init__(self, url, filters, on_event, last_log_id, on_closed=None):
+    def __init__(self, url, filters, on_event, last_log_id, on_closed):
         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
             ssl_options['cert_reqs'] = ssl.CERT_NONE
@@ -27,7 +27,11 @@ class _EventClient(WebSocketClient):
         # will be attempted -- and it might not be the right one. See
         # ws4py's WebSocketBaseClient.__init__.
         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
-        self.filters = filters
+
+        if filters:
+            self.filters = filters
+        else:
+            self.filters = [[]]
         self.on_event = on_event
         self.last_log_id = last_log_id
         self._closing_lock = threading.RLock()
@@ -78,7 +82,6 @@ class EventClient(object):
         self.last_log_id = last_log_id
         self.is_closed = False
         self.subscriptions = {}
-        self.subscriptions[str(filters)] = filters
         self.ec = _EventClient(url, filters, self.on_event, last_log_id, self.on_closed)
 
     def connect(self):
@@ -106,11 +109,16 @@ class EventClient(object):
 
     def on_closed(self):
         if self.is_closed == False:
-            filters = []
-            for s in self.subscriptions:
-                filters.append(self.subscriptions[s])
             self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-            self.ec.connect()
+            while True:
+              try:
+                  self.ec.connect()
+                  for s in self.subscriptions:
+                      self.ec.subscribe(self.subscriptions[s], self.last_log_id)
+                  break
+              except:
+                  _logger.warn("Failed to reconnect to websockets on %s. Will retry after 5s." % endpoint)
+                  time.sleep(5)
 
 
 class PollClient(threading.Thread):
index 2049f56c0c903557123c78824ca72b652c83ba71..98d84ade759a95f7f72844b61e46c05d66f8f7e9 100644 (file)
@@ -144,15 +144,15 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         with self.assertRaises(Queue.Empty):
             self.assertEqual(events.get(True, 2), None)
 
-        # create one more obj
-        human2 = arvados.api('v1').humans().create(body={}).execute()
-
         # close (im)properly
         if close_unexpected:
             self.ws.close_connection()
         else:
             self.ws.close()
 
+        # create one more obj
+        human2 = arvados.api('v1').humans().create(body={}).execute()
+
         # (un)expect the object creation event
         if close_unexpected:
             log_object_uuids = []