+ with self._closing_lock:
+ if not self._closing:
+ self.on_event(json.loads(str(m)))
+
+ def close(self, code=1000, reason='', timeout=0):
+ """Close event client and optionally wait for it to finish.
+
+ :timeout: is the number of seconds to wait for ws4py to
+ indicate that the connection has closed.
+ """
+ super(_EventClient, self).close(code, reason)
+ with self._closing_lock:
+ # make sure we don't process any more messages.
+ self._closing = True
+ # wait for ws4py to tell us the connection is closed.
+ self._closed.wait(timeout=timeout)
+
+ def subscribe(self, f, last_log_id=None):
+ m = {"method": "subscribe", "filters": f}
+ if last_log_id is not None:
+ m["last_log_id"] = last_log_id
+ self.send(json.dumps(m))
+
+ def unsubscribe(self, f):
+ self.send(json.dumps({"method": "unsubscribe", "filters": f}))
+
+
+class EventClient(object):
+ def __init__(self, url, filters, on_event_cb, last_log_id):
+ self.url = url
+ if filters:
+ self.filters = [filters]
+ else:
+ self.filters = [[]]
+ self.on_event_cb = on_event_cb
+ self.last_log_id = last_log_id
+ self.is_closed = threading.Event()
+ self._setup_event_client()
+
+ def _setup_event_client(self):
+ self.ec = _EventClient(self.url, self.filters, self.on_event,
+ self.last_log_id, self.on_closed)
+ self.ec.daemon = True
+ try:
+ self.ec.connect()
+ except Exception:
+ self.ec.close_connection()
+ raise
+
+ def subscribe(self, f, last_log_id=None):
+ self.filters.append(f)
+ self.ec.subscribe(f, last_log_id)
+
+ def unsubscribe(self, f):
+ del self.filters[self.filters.index(f)]
+ self.ec.unsubscribe(f)
+
+ def close(self, code=1000, reason='', timeout=0):
+ self.is_closed.set()
+ self.ec.close(code, reason, timeout)