self.filters = filters
self.on_event = on_event
self.last_log_id = last_log_id
+ self._closed_lock = threading.RLock()
+ self._closed = False
def opened(self):
self.subscribe(self.filters, self.last_log_id)
def received_message(self, m):
- self.on_event(json.loads(str(m)))
+ with self._closed_lock:
+ if not self._closed:
+ self.on_event(json.loads(str(m)))
- def close_connection(self):
- try:
- self.sock.shutdown(socket.SHUT_RDWR)
- self.sock.close()
- except:
- pass
+ def close(self, code=1000, reason=''):
+ """Close event client and wait for it to finish."""
+ super(EventClient, self).close(code, reason)
+ with self._closed_lock:
+ # make sure we don't process any more messages.
+ self._closed = True
def subscribe(self, filters, last_log_id=None):
m = {"method": "subscribe", "filters": filters}
self.stop.wait(1)
def close(self):
+ """Close poll client and wait for it to finish."""
+
self.stop.set()
try:
self.join()