X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3dd2a1957ae4106bfc2bd5405662c47c087eb79c..570509ab4d2ef93d870fd2b1f2eab178afb1bad9:/sdk/python/arvados/events.py diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py index 68cfa21870..a1b4638439 100644 --- a/sdk/python/arvados/events.py +++ b/sdk/python/arvados/events.py @@ -83,7 +83,7 @@ class EventClient(object): self.filters = [[]] self.on_event_cb = on_event_cb self.last_log_id = last_log_id - self.is_closed = False + self.is_closed = threading.Event() self._setup_event_client() def _setup_event_client(self): @@ -96,12 +96,6 @@ class EventClient(object): self.ec.close_connection() raise - def connect(self): - self.ec.connect() - - def close_connection(self): - self.ec.close_connection() - def subscribe(self, f, last_log_id=None): self.filters.append(f) self.ec.subscribe(f, last_log_id) @@ -111,7 +105,7 @@ class EventClient(object): self.ec.unsubscribe(f) def close(self, code=1000, reason='', timeout=0): - self.is_closed = True + self.is_closed.set() self.ec.close(code, reason, timeout) def on_event(self, m): @@ -124,20 +118,26 @@ class EventClient(object): thread.interrupt_main() def on_closed(self): - if self.is_closed == False: + if not self.is_closed.is_set(): _logger.warn("Unexpected close. Reconnecting.") for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): try: self._setup_event_client() + _logger.warn("Reconnect successful.") break except Exception as e: _logger.warn("Error '%s' during websocket reconnect.", e) if tries_left == 0: _logger.exception("EventClient thread could not contact websocket server.") - self.is_closed = True + self.is_closed.set() thread.interrupt_main() return + def run_forever(self): + # Have to poll here to let KeyboardInterrupt get raised. + while not self.is_closed.wait(1): + pass + class PollClient(threading.Thread): def __init__(self, api, filters, on_event, poll_time, last_log_id):