class _EventClient(WebSocketClient):
- def __init__(self, url, filters, on_event, last_log_id, on_closed=None):
+ def __init__(self, url, filters, on_event, last_log_id, on_closed):
ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
ssl_options['cert_reqs'] = ssl.CERT_NONE
# will be attempted -- and it might not be the right one. See
# ws4py's WebSocketBaseClient.__init__.
super(_EventClient, self).__init__(url, ssl_options=ssl_options)
- self.filters = filters
+
+ if filters:
+ self.filters = filters
+ else:
+ self.filters = [[]]
self.on_event = on_event
self.last_log_id = last_log_id
self._closing_lock = threading.RLock()
self.last_log_id = last_log_id
self.is_closed = False
self.subscriptions = {}
- self.subscriptions[str(filters)] = filters
self.ec = _EventClient(url, filters, self.on_event, last_log_id, self.on_closed)
def connect(self):
def on_closed(self):
if self.is_closed == False:
- filters = []
- for s in self.subscriptions:
- filters.append(self.subscriptions[s])
self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- self.ec.connect()
+ while True:
+ try:
+ self.ec.connect()
+ for s in self.subscriptions:
+ self.ec.subscribe(self.subscriptions[s], self.last_log_id)
+ break
+ except:
+ _logger.warn("Failed to reconnect to websockets on %s. Will retry after 5s." % endpoint)
+ time.sleep(5)
class PollClient(threading.Thread):
with self.assertRaises(Queue.Empty):
self.assertEqual(events.get(True, 2), None)
- # create one more obj
- human2 = arvados.api('v1').humans().create(body={}).execute()
-
# close (im)properly
if close_unexpected:
self.ws.close_connection()
else:
self.ws.close()
+ # create one more obj
+ human2 = arvados.api('v1').humans().create(body={}).execute()
+
# (un)expect the object creation event
if close_unexpected:
log_object_uuids = []