+ def unsubscribe(self, f):
+ self.send(json.dumps({"method": "unsubscribe", "filters": f}))
+
+
+class EventClient(object):
+ def __init__(self, url, filters, on_event_cb, last_log_id):
+ self.url = url
+ if filters:
+ self.filters = [filters]
+ else:
+ self.filters = [[]]
+ self.on_event_cb = on_event_cb
+ self.last_log_id = last_log_id
+ self.is_closed = False
+ self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+
+ def connect(self):
+ self.ec.connect()
+
+ def close_connection(self):
+ self.ec.close_connection()
+
+ def subscribe(self, f, last_log_id=None):
+ self.filters.append(f)
+ self.ec.subscribe(f, last_log_id)
+
+ def unsubscribe(self, f):
+ del self.filters[self.filters.index(f)]
+ self.ec.unsubscribe(f)
+
+ def close(self, code=1000, reason='', timeout=0):
+ self.is_closed = True
+ self.ec.close(code, reason, timeout)
+
+ def on_event(self, m):
+ if m.get('id') != None:
+ self.last_log_id = m.get('id')
+ self.on_event_cb(m)
+
+ def on_closed(self):
+ if self.is_closed == False:
+ _logger.warn("Unexpected close. Reconnecting.")
+ self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+ while True:
+ try:
+ self.ec.connect()
+ break
+ except Exception as e:
+ _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
+ time.sleep(5)