self.filters = [[]]
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
- self.is_closed = False
+ self.is_closed = threading.Event()
self._setup_event_client()
def _setup_event_client(self):
self.ec.close_connection()
raise
- def connect(self):
- self.ec.connect()
-
- def close_connection(self):
- self.ec.close_connection()
-
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
self.ec.subscribe(f, last_log_id)
self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
- self.is_closed = True
+ self.is_closed.set()
self.ec.close(code, reason, timeout)
def on_event(self, m):
thread.interrupt_main()
def on_closed(self):
- if self.is_closed == False:
+ if not self.is_closed.is_set():
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
self._setup_event_client()
+ _logger.warn("Reconnect successful.")
break
except Exception as e:
_logger.warn("Error '%s' during websocket reconnect.", e)
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
- self.is_closed = True
+ self.is_closed.set()
thread.interrupt_main()
return
+ def run_forever(self):
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):