1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Follow events on an Arvados cluster
6 This module provides different ways to get notified about events that happen
7 on an Arvados cluster. You indicate which events you want updates about, and
8 provide a function that is called any time one of those events is received
11 `subscribe` is the main entry point. It helps you construct one of the two
12 API-compatible client classes: `EventClient` (which uses WebSockets) or
13 `PollClient` (which periodically queries the logs list methods).
27 import websockets.exceptions as ws_exc
28 import websockets.sync.client as ws_client
33 from .retry import RetryLoop
34 from ._version import __version__
46 EventCallback = Callable[[Dict[str, Any]], object]
47 """Type signature for an event handler callback"""
48 FilterCondition = List[Union[None, str, 'Filter']]
49 """Type signature for a single filter condition"""
50 Filter = List[FilterCondition]
51 """Type signature for an entire filter"""
53 _logger = logging.getLogger('arvados.events')
55 class WSMethod(enum.Enum):
56 """Arvados WebSocket methods
58 This enum represents valid values for the `method` field in messages
59 sent to an Arvados WebSocket server.
61 SUBSCRIBE = 'subscribe'
63 UNSUBSCRIBE = 'unsubscribe'
67 class EventClient(threading.Thread):
68 """Follow Arvados events via WebSocket
70 EventClient follows events on Arvados cluster published by the WebSocket
71 server. Users can select the events they want to follow and run their own
72 callback function on each.
74 _USER_AGENT = 'Python/{}.{}.{} arvados.events/{}'.format(
75 *sys.version_info[:3],
82 filters: Optional[Filter],
83 on_event_cb: EventCallback,
84 last_log_id: Optional[int]=None,
86 insecure: Optional[bool]=None,
88 """Initialize a WebSocket client
90 Constructor arguments:
92 * url: str --- The `wss` URL for an Arvados WebSocket server.
94 * filters: arvados.events.Filter | None --- One event filter to
95 subscribe to after connecting to the WebSocket server. If not
96 specified, the client will subscribe to all events.
98 * on_event_cb: arvados.events.EventCallback --- When the client
99 receives an event from the WebSocket server, it calls this
100 function with the event object.
102 * last_log_id: int | None --- If specified, this will be used as the
103 value for the `last_log_id` field in subscribe messages sent by
106 Constructor keyword arguments:
108 * insecure: bool | None --- If `True`, the client will not check the
109 validity of the server's TLS certificate. If not specified, uses
110 the value from the user's `ARVADOS_API_HOST_INSECURE` setting.
113 self.filters = [filters or []]
114 self.on_event_cb = on_event_cb
115 self.last_log_id = last_log_id
116 self.is_closed = threading.Event()
117 self._ssl_ctx = ssl.create_default_context(
118 purpose=ssl.Purpose.SERVER_AUTH,
119 cafile=util.ca_certs_path(),
122 insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
124 self._ssl_ctx.check_hostname = False
125 self._ssl_ctx.verify_mode = ssl.CERT_NONE
126 self._subscribe_lock = threading.Lock()
128 super().__init__(daemon=True)
131 def _connect(self) -> None:
132 # There are no locks protecting this method. After the thread starts,
133 # it should only be called from inside.
134 self._client = ws_client.connect(
137 ssl_context=self._ssl_ctx,
138 user_agent_header=self._USER_AGENT,
140 self._client_ok = True
142 def _subscribe(self, f: Filter, last_log_id: Optional[int]) -> None:
144 if last_log_id is not None:
145 extra['last_log_id'] = last_log_id
146 return self._update_sub(WSMethod.SUBSCRIBE, f, **extra)
148 def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None:
150 'method': method.value,
154 self._client.send(msg)
156 def close(self, code: int=1000, reason: str='', timeout: float=0) -> None:
157 """Close the WebSocket connection and stop processing events
161 * code: int --- The WebSocket close code sent to the server when
162 disconnecting. Default 1000.
164 * reason: str --- The WebSocket close reason sent to the server when
165 disconnecting. Default is an empty string.
167 * timeout: float --- How long to wait for the WebSocket server to
168 acknowledge the disconnection, in seconds. Default 0, which means
172 self._client.close_timeout = timeout or None
173 self._client.close(code, reason)
175 def run_forever(self) -> None:
176 """Run the WebSocket client indefinitely
178 This method blocks until the `close` method is called (e.g., from
179 another thread) or the client permanently loses its connection.
181 # Have to poll here to let KeyboardInterrupt get raised.
182 while not self.is_closed.wait(1):
185 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
186 """Subscribe to another set of events from the server
190 * f: arvados.events.Filter | None --- One filter to subscribe to
193 * last_log_id: int | None --- If specified, request events starting
194 from this id. If not specified, the server will only send events
195 that occur after processing the subscription.
197 with self._subscribe_lock:
198 self._subscribe(f, last_log_id)
199 self.filters.append(f)
201 def unsubscribe(self, f: Filter) -> None:
202 """Unsubscribe from an event stream
206 * f: arvados.events.Filter | None --- One event filter to stop
207 receiving events for.
209 with self._subscribe_lock:
211 index = self.filters.index(f)
213 raise ValueError(f"filter not subscribed: {f!r}") from None
214 self._update_sub(WSMethod.UNSUBSCRIBE, f)
215 del self.filters[index]
217 def on_closed(self) -> None:
218 """Handle disconnection from the WebSocket server
220 This method is called when the client loses its connection from
221 receiving events. This implementation tries to establish a new
222 connection if it was not closed client-side.
224 if self.is_closed.is_set():
226 _logger.warning("Unexpected close. Reconnecting.")
227 for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
230 except Exception as e:
231 _logger.warning("Error '%s' during websocket reconnect.", e)
233 _logger.warning("Reconnect successful.")
236 _logger.error("EventClient thread could not contact websocket server.")
238 _thread.interrupt_main()
240 def on_event(self, m: Dict[str, Any]) -> None:
241 """Handle an event from the WebSocket server
243 This method is called whenever the client receives an event from the
244 server. This implementation records the `id` field internally, then
245 calls the callback function provided at initialization time.
249 * m: Dict[str, Any] --- The event object, deserialized from JSON.
252 self.last_log_id = m['id']
258 _logger.exception("Unexpected exception from event callback.")
259 _thread.interrupt_main()
261 def run(self) -> None:
262 """Run the client loop
264 This method runs in a separate thread to receive and process events
267 self.setName(f'ArvadosWebsockets-{self.ident}')
268 while self._client_ok and not self.is_closed.is_set():
270 with self._subscribe_lock:
271 for f in self.filters:
272 self._subscribe(f, self.last_log_id)
273 for msg_s in self._client:
274 if not self.is_closed.is_set():
275 msg = json.loads(msg_s)
277 except ws_exc.ConnectionClosed:
278 self._client_ok = False
282 class PollClient(threading.Thread):
283 """Follow Arvados events via polling logs
285 PollClient follows events on Arvados cluster by periodically running
286 logs list API calls. Users can select the events they want to follow and
287 run their own callback function on each.
291 api: 'arvados.api_resources.ArvadosAPIClient',
292 filters: Optional[Filter],
293 on_event: EventCallback,
295 last_log_id: Optional[int]=None,
297 """Initialize a polling client
299 Constructor arguments:
301 * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
302 client used to query logs. It will be used in a separate thread,
303 so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
304 it should not be reused after the thread is started.
306 * filters: arvados.events.Filter | None --- One event filter to
307 subscribe to after connecting to the WebSocket server. If not
308 specified, the client will subscribe to all events.
310 * on_event: arvados.events.EventCallback --- When the client
311 receives an event from the WebSocket server, it calls this
312 function with the event object.
314 * poll_time: float --- The number of seconds to wait between querying
317 * last_log_id: int | None --- If specified, queries will include a
318 filter for logs with an `id` at least this value.
320 super(PollClient, self).__init__()
323 self.filters = [filters]
326 self.on_event = on_event
327 self.poll_time = poll_time
329 self.last_log_id = last_log_id
330 self._closing = threading.Event()
331 self._closing_lock = threading.RLock()
333 if self.last_log_id != None:
334 # Caller supplied the last-seen event ID from a previous
336 self._skip_old_events = [["id", ">", str(self.last_log_id)]]
338 # We need to do a reverse-order query to find the most
339 # recent event ID (see "if not self._skip_old_events"
341 self._skip_old_events = False
344 """Run the client loop
346 This method runs in a separate thread to poll and process events
349 self.on_event({'status': 200})
351 while not self._closing.is_set():
353 for f in self.filters:
354 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
356 if not self._skip_old_events:
357 # If the caller didn't provide a known
358 # recent ID, our first request will ask
359 # for the single most recent event from
360 # the last 2 hours (the time restriction
361 # avoids doing an expensive database
362 # query, and leaves a big enough margin to
363 # account for clock skew). If we do find a
364 # recent event, we remember its ID but
365 # then discard it (we are supposed to be
366 # returning new/current events, not old
369 # Subsequent requests will get multiple
370 # events in chronological order, and
371 # filter on that same cutoff time, or
372 # (once we see our first matching event)
373 # the ID of the last-seen event.
375 # Note: self._skip_old_events must not be
376 # set until the threshold is decided.
377 # Otherwise, tests will be unreliable.
381 "%Y-%m-%dT%H:%M:%SZ",
382 time.gmtime(time.time()-7200))]]
383 items = self.api.logs().list(
386 filters=f+filter_by_time).execute()
388 self._skip_old_events = [
389 ["id", ">", str(items["items"][0]["id"])]]
392 "items_available": 0,
395 # No recent events. We can keep using
396 # the same timestamp threshold until
397 # we receive our first new event.
398 self._skip_old_events = filter_by_time
400 # In this case, either we know the most
401 # recent matching ID, or we know there
402 # were no matching events in the 2-hour
403 # window before subscribing. Either way we
404 # can safely ask for events in ascending
406 items = self.api.logs().list(
408 filters=f+self._skip_old_events).execute()
410 except errors.ApiError as error:
416 _logger.exception("PollClient thread could not contact API server.")
417 with self._closing_lock:
419 _thread.interrupt_main()
421 for i in items["items"]:
422 self._skip_old_events = [["id", ">", str(i["id"])]]
423 with self._closing_lock:
424 if self._closing.is_set():
428 except Exception as e:
429 _logger.exception("Unexpected exception from event callback.")
430 _thread.interrupt_main()
431 if items["items_available"] > len(items["items"]):
434 self._closing.wait(self.poll_time)
436 def run_forever(self):
437 """Run the polling client indefinitely
439 This method blocks until the `close` method is called (e.g., from
440 another thread) or the client permanently loses its connection.
442 # Have to poll here, otherwise KeyboardInterrupt will never get processed.
443 while not self._closing.is_set():
444 self._closing.wait(1)
446 def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
447 """Stop polling and processing events
451 * code: Optional[int] --- Ignored; this argument exists for API
452 compatibility with `EventClient.close`.
454 * reason: Optional[str] --- Ignored; this argument exists for API
455 compatibility with `EventClient.close`.
457 * timeout: float --- How long to wait for the client thread to finish
458 processing events. Default 0, which means no timeout.
460 with self._closing_lock:
463 self.join(timeout=timeout)
465 # "join() raises a RuntimeError if an attempt is made to join the
466 # current thread as that would cause a deadlock. It is also an
467 # error to join() a thread before it has been started and attempts
468 # to do so raises the same exception."
471 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
472 """Subscribe to another set of events from the server
476 * f: arvados.events.Filter | None --- One filter to subscribe to.
478 * last_log_id: Optional[int] --- Ignored; this argument exists for
479 API compatibility with `EventClient.subscribe`.
481 self.on_event({'status': 200})
482 self.filters.append(f)
484 def unsubscribe(self, f):
485 """Unsubscribe from an event stream
489 * f: arvados.events.Filter | None --- One event filter to stop
490 receiving events for.
492 del self.filters[self.filters.index(f)]
495 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
496 endpoint = api._rootDesc.get('websocketUrl', None)
498 raise errors.FeatureNotEnabledError(
499 "Server does not advertise a websocket endpoint")
500 uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
502 client = EventClient(uri_with_token, filters, on_event, last_log_id)
504 _logger.warning("Failed to connect to websockets on %s" % endpoint)
510 api: 'arvados.api_resources.ArvadosAPIClient',
511 filters: Optional[Filter],
512 on_event: EventCallback,
513 poll_fallback: float=15,
514 last_log_id: Optional[int]=None,
515 ) -> Union[EventClient, PollClient]:
516 """Start a thread to monitor events
518 This method tries to construct an `EventClient` to process Arvados
519 events via WebSockets. If that fails, or the
520 `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls
521 back to constructing a `PollClient` to process the events via API
526 * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
527 client used to query logs. It may be used in a separate thread,
528 so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
529 it should not be reused after this method returns.
531 * filters: arvados.events.Filter | None --- One event filter to
532 subscribe to after initializing the client. If not specified, the
533 client will subscribe to all events.
535 * on_event: arvados.events.EventCallback --- When the client receives an
536 event, it calls this function with the event object.
538 * poll_time: float --- The number of seconds to wait between querying
539 logs. If 0, this function will refuse to construct a `PollClient`.
542 * last_log_id: int | None --- If specified, start processing events with
543 at least this `id` value.
545 if not poll_fallback:
546 return _subscribe_websocket(api, filters, on_event, last_log_id)
549 if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
550 return _subscribe_websocket(api, filters, on_event, last_log_id)
552 _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
553 except Exception as e:
554 _logger.warning("Falling back to polling after websocket error: %s" % e)
555 p = PollClient(api, filters, on_event, poll_fallback, last_log_id)