+from __future__ import absolute_import
+from future import standard_library
+standard_library.install_aliases()
+from builtins import str
+from builtins import object
import arvados
-import config
-import errors
-from retry import RetryLoop
+from . import config
+from . import errors
+from .retry import RetryLoop
import logging
import json
-import thread
+import _thread
import threading
import time
import os
self.filters = [[]]
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
- self.is_closed = False
- self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+ self.is_closed = threading.Event()
+ self._setup_event_client()
- def connect(self):
- self.ec.connect()
-
- def close_connection(self):
- self.ec.close_connection()
+ def _setup_event_client(self):
+ self.ec = _EventClient(self.url, self.filters, self.on_event,
+ self.last_log_id, self.on_closed)
+ self.ec.daemon = True
+ try:
+ self.ec.connect()
+ except Exception:
+ self.ec.close_connection()
+ raise
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
- self.is_closed = True
+ self.is_closed.set()
self.ec.close(code, reason, timeout)
def on_event(self, m):
self.on_event_cb(m)
except Exception as e:
_logger.exception("Unexpected exception from event callback.")
- thread.interrupt_main()
+ _thread.interrupt_main()
def on_closed(self):
- if self.is_closed == False:
- _logger.warn("Unexpected close. Reconnecting.")
- self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- while True:
- try:
- self.ec.connect()
- break
- except Exception as e:
- _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
- time.sleep(5)
+ if not self.is_closed.is_set():
+ _logger.warning("Unexpected close. Reconnecting.")
+ for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
+ try:
+ self._setup_event_client()
+ _logger.warning("Reconnect successful.")
+ break
+ except Exception as e:
+ _logger.warning("Error '%s' during websocket reconnect.", e)
+ if tries_left == 0:
+ _logger.exception("EventClient thread could not contact websocket server.")
+ self.is_closed.set()
+ _thread.interrupt_main()
+ return
+
+ def run_forever(self):
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
class PollClient(threading.Thread):
self._closing_lock = threading.RLock()
def run(self):
- self.id = 0
if self.last_log_id != None:
- self.id = self.last_log_id
+ # Caller supplied the last-seen event ID from a previous
+ # connection
+ skip_old_events = [["id", ">", str(self.last_log_id)]]
else:
- for f in self.filters:
- for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
- try:
- items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
- break
- except errors.ApiError as error:
- pass
- else:
- tries_left = 0
- break
- if tries_left == 0:
- _logger.exception("PollClient thread could not contact API server.")
- with self._closing_lock:
- self._closing.set()
- thread.interrupt_main()
- return
- if items:
- if items[0]['id'] > self.id:
- self.id = items[0]['id']
+ # We need to do a reverse-order query to find the most
+ # recent event ID (see "if not skip_old_events" below).
+ skip_old_events = False
self.on_event({'status': 200})
while not self._closing.is_set():
- max_id = self.id
moreitems = False
for f in self.filters:
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
try:
- items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ if not skip_old_events:
+ # If the caller didn't provide a known
+ # recent ID, our first request will ask
+ # for the single most recent event from
+ # the last 2 hours (the time restriction
+ # avoids doing an expensive database
+ # query, and leaves a big enough margin to
+ # account for clock skew). If we do find a
+ # recent event, we remember its ID but
+ # then discard it (we are supposed to be
+ # returning new/current events, not old
+ # ones).
+ #
+ # Subsequent requests will get multiple
+ # events in chronological order, and
+ # filter on that same cutoff time, or
+ # (once we see our first matching event)
+ # the ID of the last-seen event.
+ skip_old_events = [[
+ "created_at", ">=",
+ time.strftime(
+ "%Y-%m-%dT%H:%M:%SZ",
+ time.gmtime(time.time()-7200))]]
+ items = self.api.logs().list(
+ order="id desc",
+ limit=1,
+ filters=f+skip_old_events).execute()
+ if items["items"]:
+ skip_old_events = [
+ ["id", ">", str(items["items"][0]["id"])]]
+ items = {
+ "items": [],
+ "items_available": 0,
+ }
+ else:
+ # In this case, either we know the most
+ # recent matching ID, or we know there
+ # were no matching events in the 2-hour
+ # window before subscribing. Either way we
+ # can safely ask for events in ascending
+ # order.
+ items = self.api.logs().list(
+ order="id asc",
+ filters=f+skip_old_events).execute()
break
except errors.ApiError as error:
pass
+ else:
+ tries_left = 0
+ break
if tries_left == 0:
_logger.exception("PollClient thread could not contact API server.")
with self._closing_lock:
self._closing.set()
- thread.interrupt_main()
+ _thread.interrupt_main()
return
for i in items["items"]:
- if i['id'] > max_id:
- max_id = i['id']
+ skip_old_events = [["id", ">", str(i["id"])]]
with self._closing_lock:
if self._closing.is_set():
return
self.on_event(i)
except Exception as e:
_logger.exception("Unexpected exception from event callback.")
- thread.interrupt_main()
+ _thread.interrupt_main()
if items["items_available"] > len(items["items"]):
moreitems = True
- self.id = max_id
if not moreitems:
self._closing.wait(self.poll_time)
if not endpoint:
raise errors.FeatureNotEnabledError(
"Server does not advertise a websocket endpoint")
+ uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
try:
- uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
client = EventClient(uri_with_token, filters, on_event, last_log_id)
- ok = False
- try:
- client.connect()
- ok = True
- return client
- finally:
- if not ok:
- client.close_connection()
- except:
- _logger.warn("Failed to connect to websockets on %s" % endpoint)
+ except Exception:
+ _logger.warning("Failed to connect to websockets on %s" % endpoint)
raise
+ else:
+ return client
def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
else:
_logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
except Exception as e:
- _logger.warn("Falling back to polling after websocket error: %s" % e)
+ _logger.warning("Falling back to polling after websocket error: %s" % e)
p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
p.start()
return p