Merge branch 'master' into 9352-many-nodes-make-workbench-faster
[arvados.git] / sdk / python / arvados / events.py
index d88897f1234329b5294bedb1da4e4104f9720b4e..a1b46384394d84ad6b68e5c6971cc890823a9d12 100644 (file)
@@ -83,14 +83,18 @@ class EventClient(object):
             self.filters = [[]]
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
-        self.is_closed = False
-        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+        self.is_closed = threading.Event()
+        self._setup_event_client()
 
-    def connect(self):
-        self.ec.connect()
-
-    def close_connection(self):
-        self.ec.close_connection()
+    def _setup_event_client(self):
+        self.ec = _EventClient(self.url, self.filters, self.on_event,
+                               self.last_log_id, self.on_closed)
+        self.ec.daemon = True
+        try:
+            self.ec.connect()
+        except Exception:
+            self.ec.close_connection()
+            raise
 
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
@@ -101,7 +105,7 @@ class EventClient(object):
         self.ec.unsubscribe(f)
 
     def close(self, code=1000, reason='', timeout=0):
-        self.is_closed = True
+        self.is_closed.set()
         self.ec.close(code, reason, timeout)
 
     def on_event(self, m):
@@ -114,21 +118,26 @@ class EventClient(object):
             thread.interrupt_main()
 
     def on_closed(self):
-        if self.is_closed == False:
+        if not self.is_closed.is_set():
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
-                    self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-                    self.ec.connect()
+                    self._setup_event_client()
+                    _logger.warn("Reconnect successful.")
                     break
                 except Exception as e:
                     _logger.warn("Error '%s' during websocket reconnect.", e)
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
-                self.is_closed = True
+                self.is_closed.set()
                 thread.interrupt_main()
                 return
 
+    def run_forever(self):
+        # Have to poll here to let KeyboardInterrupt get raised.
+        while not self.is_closed.wait(1):
+            pass
+
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
@@ -250,20 +259,14 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     if not endpoint:
         raise errors.FeatureNotEnabledError(
             "Server does not advertise a websocket endpoint")
+    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
     try:
-        uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
-        ok = False
-        try:
-            client.connect()
-            ok = True
-            return client
-        finally:
-            if not ok:
-                client.close_connection()
-    except:
+    except Exception:
         _logger.warn("Failed to connect to websockets on %s" % endpoint)
         raise
+    else:
+        return client
 
 
 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):