+ def close(self, code=1000, reason='', timeout=0):
+ self.is_closed.set()
+ self.ec.close(code, reason, timeout)
+
+ def on_event(self, m):
+ if m.get('id') != None:
+ self.last_log_id = m.get('id')
+ try:
+ self.on_event_cb(m)
+ except Exception as e:
+ _logger.exception("Unexpected exception from event callback.")
+ thread.interrupt_main()
+
+ def on_closed(self):
+ if not self.is_closed.is_set():
+ _logger.warn("Unexpected close. Reconnecting.")
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
+ try:
+ self._setup_event_client()
+ break
+ except Exception as e:
+ _logger.warn("Error '%s' during websocket reconnect.", e)
+ if tries_left == 0:
+ _logger.exception("EventClient thread could not contact websocket server.")
+ self.is_closed.set()
+ thread.interrupt_main()
+ return
+
+ def run_forever(self):
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
+
+
+class PollClient(threading.Thread):
+ def __init__(self, api, filters, on_event, poll_time, last_log_id):
+ super(PollClient, self).__init__()
+ self.api = api
+ if filters:
+ self.filters = [filters]
+ else:
+ self.filters = [[]]
+ self.on_event = on_event
+ self.poll_time = poll_time
+ self.daemon = True
+ self.last_log_id = last_log_id
+ self._closing = threading.Event()
+ self._closing_lock = threading.RLock()
+
+ def run(self):
+ self.id = 0
+ if self.last_log_id != None:
+ self.id = self.last_log_id
+ else:
+ for f in self.filters:
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+ try:
+ items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
+ break
+ except errors.ApiError as error:
+ pass
+ else:
+ tries_left = 0
+ break
+ if tries_left == 0:
+ _logger.exception("PollClient thread could not contact API server.")
+ with self._closing_lock:
+ self._closing.set()
+ thread.interrupt_main()
+ return
+ if items:
+ if items[0]['id'] > self.id:
+ self.id = items[0]['id']
+
+ self.on_event({'status': 200})
+
+ while not self._closing.is_set():
+ max_id = self.id
+ moreitems = False
+ for f in self.filters:
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+ try:
+ items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ break
+ except errors.ApiError as error:
+ pass
+ else:
+ tries_left = 0
+ break
+ if tries_left == 0:
+ _logger.exception("PollClient thread could not contact API server.")
+ with self._closing_lock:
+ self._closing.set()
+ thread.interrupt_main()
+ return
+ for i in items["items"]:
+ if i['id'] > max_id:
+ max_id = i['id']
+ with self._closing_lock:
+ if self._closing.is_set():
+ return
+ try:
+ self.on_event(i)
+ except Exception as e:
+ _logger.exception("Unexpected exception from event callback.")
+ thread.interrupt_main()
+ if items["items_available"] > len(items["items"]):
+ moreitems = True
+ self.id = max_id
+ if not moreitems:
+ self._closing.wait(self.poll_time)
+
+ def run_forever(self):
+ # Have to poll here, otherwise KeyboardInterrupt will never get processed.
+ while not self._closing.is_set():
+ self._closing.wait(1)
+
+ def close(self, code=None, reason=None, timeout=0):
+ """Close poll client and optionally wait for it to finish.
+
+ If an :on_event: handler is running in a different thread,
+ first wait (indefinitely) for it to return.
+
+ After closing, wait up to :timeout: seconds for the thread to
+ finish the poll request in progress (if any).
+
+ :code: and :reason: are ignored. They are present for
+ interface compatibility with EventClient.
+ """
+
+ with self._closing_lock:
+ self._closing.set()