Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / python / arvados / events.py
index 8b81bf39398c7ae0358d057885129ad5971f6001..88a916e659e54643468536a11c94474ccb2ee3d0 100644 (file)
@@ -1,6 +1,17 @@
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
+"""Follow events on an Arvados cluster
+
+This module provides different ways to get notified about events that happen
+on an Arvados cluster. You indicate which events you want updates about, and
+provide a function that is called any time one of those events is received
+from the server.
+
+`subscribe` is the main entry point. It helps you construct one of the two
+API-compatible client classes: `EventClient` (which uses WebSockets) or
+`PollClient` (which periodically queries the logs list methods).
+"""
 
 import enum
 import json
@@ -151,7 +162,7 @@ class EventClient(threading.Thread):
           disconnecting. Default 1000.
 
         * reason: str --- The WebSocket close reason sent to the server when
-          disconnecting. Default is the empty string.
+          disconnecting. Default is an empty string.
 
         * timeout: float --- How long to wait for the WebSocket server to
           acknowledge the disconnection, in seconds. Default 0, which means
@@ -269,7 +280,43 @@ class EventClient(threading.Thread):
 
 
 class PollClient(threading.Thread):
-    def __init__(self, api, filters, on_event, poll_time, last_log_id):
+    """Follow Arvados events via polling logs
+
+    PollClient follows events on Arvados cluster by periodically running
+    logs list API calls. Users can select the events they want to follow and
+    run their own callback function on each.
+    """
+    def __init__(
+            self,
+            api: 'arvados.api_resources.ArvadosAPIClient',
+            filters: Optional[Filter],
+            on_event: EventCallback,
+            poll_time: float=15,
+            last_log_id: Optional[int]=None,
+    ) -> None:
+        """Initialize a polling client
+
+        Constructor arguments:
+
+        * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
+          client used to query logs. It will be used in a separate thread,
+          so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
+          it should not be reused after the thread is started.
+
+        * filters: arvados.events.Filter | None --- One event filter to
+          subscribe to after connecting to the WebSocket server. If not
+          specified, the client will subscribe to all events.
+
+        * on_event: arvados.events.EventCallback --- When the client
+          receives an event from the WebSocket server, it calls this
+          function with the event object.
+
+        * poll_time: float --- The number of seconds to wait between querying
+          logs. Default 15.
+
+        * last_log_id: int | None --- If specified, queries will include a
+          filter for logs with an `id` at least this value.
+        """
         super(PollClient, self).__init__()
         self.api = api
         if filters:
@@ -294,6 +341,11 @@ class PollClient(threading.Thread):
             self._skip_old_events = False
 
     def run(self):
+        """Run the client loop
+
+        This method runs in a separate thread to poll and process events
+        from the server.
+        """
         self.on_event({'status': 200})
 
         while not self._closing.is_set():
@@ -382,23 +434,29 @@ class PollClient(threading.Thread):
                 self._closing.wait(self.poll_time)
 
     def run_forever(self):
+        """Run the polling client indefinitely
+
+        This method blocks until the `close` method is called (e.g., from
+        another thread) or the client permanently loses its connection.
+        """
         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
         while not self._closing.is_set():
             self._closing.wait(1)
 
-    def close(self, code=None, reason=None, timeout=0):
-        """Close poll client and optionally wait for it to finish.
+    def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
+        """Stop polling and processing events
 
-        If an :on_event: handler is running in a different thread,
-        first wait (indefinitely) for it to return.
+        Arguments:
 
-        After closing, wait up to :timeout: seconds for the thread to
-        finish the poll request in progress (if any).
+        * code: Optional[int] --- Ignored; this argument exists for API
+          compatibility with `EventClient.close`.
 
-        :code: and :reason: are ignored. They are present for
-        interface compatibility with EventClient.
-        """
+        * reason: Optional[str] --- Ignored; this argument exists for API
+          compatibility with `EventClient.close`.
 
+        * timeout: float --- How long to wait for the client thread to finish
+          processing events. Default 0, which means no timeout.
+        """
         with self._closing_lock:
             self._closing.set()
         try:
@@ -410,11 +468,27 @@ class PollClient(threading.Thread):
             # to do so raises the same exception."
             pass
 
-    def subscribe(self, f):
+    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
+        """Subscribe to another set of events from the server
+
+        Arguments:
+
+        * f: arvados.events.Filter | None --- One filter to subscribe to.
+
+        * last_log_id: Optional[int] --- Ignored; this argument exists for
+          API compatibility with `EventClient.subscribe`.
+        """
         self.on_event({'status': 200})
         self.filters.append(f)
 
     def unsubscribe(self, f):
+        """Unsubscribe from an event stream
+
+        Arguments:
+
+        * f: arvados.events.Filter | None --- One event filter to stop
+        receiving events for.
+        """
         del self.filters[self.filters.index(f)]
 
 
@@ -432,21 +506,42 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     else:
         return client
 
-
-def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
-    """
-    :api:
-      a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
-    :filters:
-      Initial subscription filters.
-    :on_event:
-      The callback when a message is received.
-    :poll_fallback:
-      If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
-    :last_log_id:
-      Log rows that are newer than the log id
+def subscribe(
+        api: 'arvados.api_resources.ArvadosAPIClient',
+        filters: Optional[Filter],
+        on_event: EventCallback,
+        poll_fallback: float=15,
+        last_log_id: Optional[int]=None,
+) -> Union[EventClient, PollClient]:
+    """Start a thread to monitor events
+
+    This method tries to construct an `EventClient` to process Arvados
+    events via WebSockets. If that fails, or the
+    `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls
+    back to constructing a `PollClient` to process the events via API
+    polling.
+
+    Arguments:
+
+    * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
+      client used to query logs. It may be used in a separate thread,
+      so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
+      it should not be reused after this method returns.
+
+    * filters: arvados.events.Filter | None --- One event filter to
+      subscribe to after initializing the client. If not specified, the
+      client will subscribe to all events.
+
+    * on_event: arvados.events.EventCallback --- When the client receives an
+      event, it calls this function with the event object.
+
+    * poll_time: float --- The number of seconds to wait between querying
+      logs. If 0, this function will refuse to construct a `PollClient`.
+      Default 15.
+
+    * last_log_id: int | None --- If specified, start processing events with
+      at least this `id` value.
     """
-
     if not poll_fallback:
         return _subscribe_websocket(api, filters, on_event, last_log_id)