+ extra['last_log_id'] = last_log_id
+ return self._update_sub(WSMethod.SUBSCRIBE, f, **extra)
+
+ def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None:
+ msg = json.dumps({
+ 'method': method.value,
+ 'filters': f,
+ **extra,
+ })
+ self._client.send(msg)
+
+ def close(self, code: int=1000, reason: str='', timeout: float=0) -> None:
+ """Close the WebSocket connection and stop processing events
+
+ Arguments:
+
+ * code: int --- The WebSocket close code sent to the server when
+ disconnecting. Default 1000.
+
+ * reason: str --- The WebSocket close reason sent to the server when
+ disconnecting. Default is the empty string.
+
+ * timeout: float --- How long to wait for the WebSocket server to
+ acknowledge the disconnection, in seconds. Default 0, which means
+ no timeout.
+ """
+ self.is_closed.set()
+ self._client.close_timeout = timeout or None
+ self._client.close(code, reason)
+
+ def run_forever(self) -> None:
+ """Run the WebSocket client indefinitely
+
+ This method blocks until the `close` method is called (e.g., from
+ another thread) or the client permanently loses its connection.
+ """
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
+
+ def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
+ """Subscribe to another set of events from the server
+
+ Arguments:
+
+ * f: arvados.events.Filter | None --- One filter to subscribe to
+ events for.
+
+ * last_log_id: int | None --- If specified, request events starting
+ from this id. If not specified, the server will only send events
+ that occur after processing the subscription.
+ """
+ with self._subscribe_lock:
+ self._subscribe(f, last_log_id)
+ self.filters.append(f)
+
+ def unsubscribe(self, f: Filter) -> None:
+ """Unsubscribe from an event stream
+
+ Arguments:
+
+ * f: arvados.events.Filter | None --- One event filter to stop
+ receiving events for.
+ """
+ with self._subscribe_lock:
+ try:
+ index = self.filters.index(f)
+ except ValueError:
+ raise ValueError(f"filter not subscribed: {f!r}") from None
+ self._update_sub(WSMethod.UNSUBSCRIBE, f)
+ del self.filters[index]
+
+ def on_closed(self) -> None:
+ """Handle disconnection from the WebSocket server
+
+ This method is called when the client loses its connection from
+ receiving events. This implementation tries to establish a new
+ connection if it was not closed client-side.
+ """
+ if self.is_closed.is_set():
+ return
+ _logger.warning("Unexpected close. Reconnecting.")
+ for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
+ try:
+ self._connect()
+ except Exception as e:
+ _logger.warning("Error '%s' during websocket reconnect.", e)
+ else:
+ _logger.warning("Reconnect successful.")
+ break
+ else:
+ _logger.error("EventClient thread could not contact websocket server.")
+ self.is_closed.set()
+ _thread.interrupt_main()
+
+ def on_event(self, m: Dict[str, Any]) -> None:
+ """Handle an event from the WebSocket server
+
+ This method is called whenever the client receives an event from the
+ server. This implementation records the `id` field internally, then
+ calls the callback function provided at initialization time.
+
+ Arguments:
+
+ * m: Dict[str, Any] --- The event object, deserialized from JSON.
+ """
+ try:
+ self.last_log_id = m['id']
+ except KeyError:
+ pass
+ try:
+ self.on_event_cb(m)
+ except Exception:
+ _logger.exception("Unexpected exception from event callback.")
+ _thread.interrupt_main()
+
+ def run(self) -> None:
+ """Run the client loop
+
+ This method runs in a separate thread to receive and process events
+ from the server.
+ """
+ self.setName(f'ArvadosWebsockets-{self.ident}')
+ while self._client_ok and not self.is_closed.is_set():
+ try:
+ with self._subscribe_lock:
+ for f in self.filters:
+ self._subscribe(f, self.last_log_id)
+ for msg_s in self._client:
+ if not self.is_closed.is_set():
+ msg = json.loads(msg_s)
+ self.on_event(msg)
+ except ws_exc.ConnectionClosed:
+ self._client_ok = False
+ self.on_closed()