+
+class PollClient(threading.Thread):
+ def __init__(self, api, filters, on_event, poll_time, last_log_id):
+ super(PollClient, self).__init__()
+ self.api = api
+ if filters:
+ self.filters = [filters]
+ else:
+ self.filters = [[]]
+ self.on_event = on_event
+ self.poll_time = poll_time
+ self.daemon = True
+ self.last_log_id = last_log_id
+ self._closing = threading.Event()
+ self._closing_lock = threading.RLock()
+
+ def run(self):
+ if self.last_log_id != None:
+ # Caller supplied the last-seen event ID from a previous
+ # connection
+ skip_old_events = [["id", ">", str(self.last_log_id)]]
+ else:
+ # We need to do a reverse-order query to find the most
+ # recent event ID (see "if not skip_old_events" below).
+ skip_old_events = False
+
+ self.on_event({'status': 200})
+
+ while not self._closing.is_set():
+ moreitems = False
+ for f in self.filters:
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
+ try:
+ if not skip_old_events:
+ # If the caller didn't provide a known
+ # recent ID, our first request will ask
+ # for the single most recent event from
+ # the last 2 hours (the time restriction
+ # avoids doing an expensive database
+ # query, and leaves a big enough margin to
+ # account for clock skew). If we do find a
+ # recent event, we remember its ID but
+ # then discard it (we are supposed to be
+ # returning new/current events, not old
+ # ones).
+ #
+ # Subsequent requests will get multiple
+ # events in chronological order, and
+ # filter on that same cutoff time, or
+ # (once we see our first matching event)
+ # the ID of the last-seen event.
+ skip_old_events = [[
+ "created_at", ">=",
+ time.strftime(
+ "%Y-%m-%dT%H:%M:%SZ",
+ time.gmtime(time.time()-7200))]]
+ items = self.api.logs().list(
+ order="id desc",
+ limit=1,
+ filters=f+skip_old_events).execute()
+ if items["items"]:
+ skip_old_events = [
+ ["id", ">", str(items["items"][0]["id"])]]
+ items = {
+ "items": [],
+ "items_available": 0,
+ }
+ else:
+ # In this case, either we know the most
+ # recent matching ID, or we know there
+ # were no matching events in the 2-hour
+ # window before subscribing. Either way we
+ # can safely ask for events in ascending
+ # order.
+ items = self.api.logs().list(
+ order="id asc",
+ filters=f+skip_old_events).execute()
+ break
+ except errors.ApiError as error:
+ pass
+ else:
+ tries_left = 0
+ break
+ if tries_left == 0:
+ _logger.exception("PollClient thread could not contact API server.")
+ with self._closing_lock:
+ self._closing.set()
+ thread.interrupt_main()
+ return
+ for i in items["items"]:
+ skip_old_events = [["id", ">", str(i["id"])]]
+ with self._closing_lock:
+ if self._closing.is_set():
+ return
+ try:
+ self.on_event(i)
+ except Exception as e:
+ _logger.exception("Unexpected exception from event callback.")
+ thread.interrupt_main()
+ if items["items_available"] > len(items["items"]):
+ moreitems = True
+ if not moreitems:
+ self._closing.wait(self.poll_time)
+
+ def run_forever(self):
+ # Have to poll here, otherwise KeyboardInterrupt will never get processed.
+ while not self._closing.is_set():
+ self._closing.wait(1)
+
+ def close(self, code=None, reason=None, timeout=0):
+ """Close poll client and optionally wait for it to finish.
+
+ If an :on_event: handler is running in a different thread,
+ first wait (indefinitely) for it to return.
+
+ After closing, wait up to :timeout: seconds for the thread to
+ finish the poll request in progress (if any).
+
+ :code: and :reason: are ignored. They are present for
+ interface compatibility with EventClient.
+ """
+
+ with self._closing_lock:
+ self._closing.set()
+ try:
+ self.join(timeout=timeout)
+ except RuntimeError:
+ # "join() raises a RuntimeError if an attempt is made to join the
+ # current thread as that would cause a deadlock. It is also an
+ # error to join() a thread before it has been started and attempts
+ # to do so raises the same exception."
+ pass
+
+ def subscribe(self, f):
+ self.on_event({'status': 200})
+ self.filters.append(f)
+
+ def unsubscribe(self, f):
+ del self.filters[self.filters.index(f)]
+
+
+def _subscribe_websocket(api, filters, on_event, last_log_id=None):
+ endpoint = api._rootDesc.get('websocketUrl', None)
+ if not endpoint:
+ raise errors.FeatureNotEnabledError(
+ "Server does not advertise a websocket endpoint")
+ uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)