_logger = logging.getLogger('arvados.events')
-class EventClient(WebSocketClient):
- def __init__(self, url, filters, on_event, last_log_id):
+class _EventClient(WebSocketClient):
+ def __init__(self, url, filters, on_event, last_log_id, on_closed):
ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
ssl_options['cert_reqs'] = ssl.CERT_NONE
# IPv4 addresses (common with "localhost"), only one of them
# will be attempted -- and it might not be the right one. See
# ws4py's WebSocketBaseClient.__init__.
- super(EventClient, self).__init__(url, ssl_options=ssl_options)
+ super(_EventClient, self).__init__(url, ssl_options=ssl_options)
+
self.filters = filters
self.on_event = on_event
self.last_log_id = last_log_id
self._closing_lock = threading.RLock()
self._closing = False
self._closed = threading.Event()
+ self.on_closed = on_closed
def opened(self):
- self.subscribe(self.filters, self.last_log_id)
+ for f in self.filters:
+ self.subscribe(f, self.last_log_id)
def closed(self, code, reason=None):
self._closed.set()
+ self.on_closed()
def received_message(self, m):
with self._closing_lock:
:timeout: is the number of seconds to wait for ws4py to
indicate that the connection has closed.
"""
- super(EventClient, self).close(code, reason)
+ super(_EventClient, self).close(code, reason)
with self._closing_lock:
# make sure we don't process any more messages.
self._closing = True
# wait for ws4py to tell us the connection is closed.
self._closed.wait(timeout=timeout)
- def subscribe(self, filters, last_log_id=None):
- m = {"method": "subscribe", "filters": filters}
+ def subscribe(self, f, last_log_id=None):
+ m = {"method": "subscribe", "filters": f}
if last_log_id is not None:
m["last_log_id"] = last_log_id
self.send(json.dumps(m))
- def unsubscribe(self, filters):
- self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+ def unsubscribe(self, f):
+ self.send(json.dumps({"method": "unsubscribe", "filters": f}))
+
+
+class EventClient(object):
+ def __init__(self, url, filters, on_event_cb, last_log_id):
+ self.url = url
+ if filters:
+ self.filters = [filters]
+ else:
+ self.filters = [[]]
+ self.on_event_cb = on_event_cb
+ self.last_log_id = last_log_id
+ self.is_closed = False
+ self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+
+ def connect(self):
+ self.ec.connect()
+
+ def close_connection(self):
+ self.ec.close_connection()
+
+ def subscribe(self, f, last_log_id=None):
+ self.filters.append(f)
+ self.ec.subscribe(f, last_log_id)
+
+ def unsubscribe(self, f):
+ del self.filters[self.filters.index(f)]
+ self.ec.unsubscribe(f)
+
+ def close(self, code=1000, reason='', timeout=0):
+ self.is_closed = True
+ self.ec.close(code, reason, timeout)
+
+ def on_event(self, m):
+ if m.get('id') != None:
+ self.last_log_id = m.get('id')
+ self.on_event_cb(m)
+
+ def on_closed(self):
+ if self.is_closed == False:
+ _logger.warn("Unexpected close. Reconnecting.")
+ self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+ while True:
+ try:
+ self.ec.connect()
+ break
+ except Exception as e:
+ _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
+ time.sleep(5)
class PollClient(threading.Thread):
# to do so raises the same exception."
pass
- def subscribe(self, filters):
+ def subscribe(self, f):
self.on_event({'status': 200})
- self.filters.append(filters)
+ self.filters.append(f)
- def unsubscribe(self, filters):
- del self.filters[self.filters.index(filters)]
+ def unsubscribe(self, f):
+ del self.filters[self.filters.index(f)]
def _subscribe_websocket(api, filters, on_event, last_log_id=None):