# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+"""Follow events on an Arvados cluster
+
+This module provides different ways to get notified about events that happen
+on an Arvados cluster. You indicate which events you want updates about, and
+provide a function that is called any time one of those events is received
+from the server.
+
+`subscribe` is the main entry point. It helps you construct one of the two
+API-compatible client classes: `EventClient` (which uses WebSockets) or
+`PollClient` (which periodically queries the logs list methods).
+"""
import enum
import json
disconnecting. Default 1000.
* reason: str --- The WebSocket close reason sent to the server when
- disconnecting. Default is the empty string.
+ disconnecting. Default is an empty string.
* timeout: float --- How long to wait for the WebSocket server to
acknowledge the disconnection, in seconds. Default 0, which means
class PollClient(threading.Thread):
- def __init__(self, api, filters, on_event, poll_time, last_log_id):
+ """Follow Arvados events via polling logs
+
+ PollClient follows events on Arvados cluster by periodically running
+ logs list API calls. Users can select the events they want to follow and
+ run their own callback function on each.
+ """
+ def __init__(
+ self,
+ api: 'arvados.api_resources.ArvadosAPIClient',
+ filters: Optional[Filter],
+ on_event: EventCallback,
+ poll_time: float=15,
+ last_log_id: Optional[int]=None,
+ ) -> None:
+ """Initialize a polling client
+
+ Constructor arguments:
+
+ * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
+ client used to query logs. It will be used in a separate thread,
+ so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
+ it should not be reused after the thread is started.
+
+ * filters: arvados.events.Filter | None --- One event filter to
+ subscribe to after connecting to the WebSocket server. If not
+ specified, the client will subscribe to all events.
+
+ * on_event: arvados.events.EventCallback --- When the client
+ receives an event from the WebSocket server, it calls this
+ function with the event object.
+
+ * poll_time: float --- The number of seconds to wait between querying
+ logs. Default 15.
+
+ * last_log_id: int | None --- If specified, queries will include a
+ filter for logs with an `id` at least this value.
+ """
super(PollClient, self).__init__()
self.api = api
if filters:
self._skip_old_events = False
def run(self):
+ """Run the client loop
+
+ This method runs in a separate thread to poll and process events
+ from the server.
+ """
self.on_event({'status': 200})
while not self._closing.is_set():
self._closing.wait(self.poll_time)
def run_forever(self):
+ """Run the polling client indefinitely
+
+ This method blocks until the `close` method is called (e.g., from
+ another thread) or the client permanently loses its connection.
+ """
# Have to poll here, otherwise KeyboardInterrupt will never get processed.
while not self._closing.is_set():
self._closing.wait(1)
- def close(self, code=None, reason=None, timeout=0):
- """Close poll client and optionally wait for it to finish.
+ def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
+ """Stop polling and processing events
- If an :on_event: handler is running in a different thread,
- first wait (indefinitely) for it to return.
+ Arguments:
- After closing, wait up to :timeout: seconds for the thread to
- finish the poll request in progress (if any).
+ * code: Optional[int] --- Ignored; this argument exists for API
+ compatibility with `EventClient.close`.
- :code: and :reason: are ignored. They are present for
- interface compatibility with EventClient.
- """
+ * reason: Optional[str] --- Ignored; this argument exists for API
+ compatibility with `EventClient.close`.
+ * timeout: float --- How long to wait for the client thread to finish
+ processing events. Default 0, which means no timeout.
+ """
with self._closing_lock:
self._closing.set()
try:
# to do so raises the same exception."
pass
- def subscribe(self, f):
+ def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
+ """Subscribe to another set of events from the server
+
+ Arguments:
+
+ * f: arvados.events.Filter | None --- One filter to subscribe to.
+
+ * last_log_id: Optional[int] --- Ignored; this argument exists for
+ API compatibility with `EventClient.subscribe`.
+ """
self.on_event({'status': 200})
self.filters.append(f)
def unsubscribe(self, f):
+ """Unsubscribe from an event stream
+
+ Arguments:
+
+ * f: arvados.events.Filter | None --- One event filter to stop
+ receiving events for.
+ """
del self.filters[self.filters.index(f)]
else:
return client
-
-def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
- """
- :api:
- a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
- :filters:
- Initial subscription filters.
- :on_event:
- The callback when a message is received.
- :poll_fallback:
- If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available.
- :last_log_id:
- Log rows that are newer than the log id
+def subscribe(
+ api: 'arvados.api_resources.ArvadosAPIClient',
+ filters: Optional[Filter],
+ on_event: EventCallback,
+ poll_fallback: float=15,
+ last_log_id: Optional[int]=None,
+) -> Union[EventClient, PollClient]:
+ """Start a thread to monitor events
+
+ This method tries to construct an `EventClient` to process Arvados
+ events via WebSockets. If that fails, or the
+ `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls
+ back to constructing a `PollClient` to process the events via API
+ polling.
+
+ Arguments:
+
+ * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
+ client used to query logs. It may be used in a separate thread,
+ so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
+ it should not be reused after this method returns.
+
+ * filters: arvados.events.Filter | None --- One event filter to
+ subscribe to after initializing the client. If not specified, the
+ client will subscribe to all events.
+
+ * on_event: arvados.events.EventCallback --- When the client receives an
+ event, it calls this function with the event object.
+
+ * poll_time: float --- The number of seconds to wait between querying
+ logs. If 0, this function will refuse to construct a `PollClient`.
+ Default 15.
+
+ * last_log_id: int | None --- If specified, start processing events with
+ at least this `id` value.
"""
-
if not poll_fallback:
return _subscribe_websocket(api, filters, on_event, last_log_id)