Merge branch '10200-cwl-crunch-script' closes #10200
[arvados.git] / sdk / python / arvados / events.py
index 68cfa2187076725a51af070a0daa68d790e31f3b..a1b46384394d84ad6b68e5c6971cc890823a9d12 100644 (file)
@@ -83,7 +83,7 @@ class EventClient(object):
             self.filters = [[]]
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
-        self.is_closed = False
+        self.is_closed = threading.Event()
         self._setup_event_client()
 
     def _setup_event_client(self):
@@ -96,12 +96,6 @@ class EventClient(object):
             self.ec.close_connection()
             raise
 
-    def connect(self):
-        self.ec.connect()
-
-    def close_connection(self):
-        self.ec.close_connection()
-
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
         self.ec.subscribe(f, last_log_id)
@@ -111,7 +105,7 @@ class EventClient(object):
         self.ec.unsubscribe(f)
 
     def close(self, code=1000, reason='', timeout=0):
-        self.is_closed = True
+        self.is_closed.set()
         self.ec.close(code, reason, timeout)
 
     def on_event(self, m):
@@ -124,20 +118,26 @@ class EventClient(object):
             thread.interrupt_main()
 
     def on_closed(self):
-        if self.is_closed == False:
+        if not self.is_closed.is_set():
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
                     self._setup_event_client()
+                    _logger.warn("Reconnect successful.")
                     break
                 except Exception as e:
                     _logger.warn("Error '%s' during websocket reconnect.", e)
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
-                self.is_closed = True
+                self.is_closed.set()
                 thread.interrupt_main()
                 return
 
+    def run_forever(self):
+        # Have to poll here to let KeyboardInterrupt get raised.
+        while not self.is_closed.wait(1):
+            pass
+
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):