Merge branch '21815-trigrams-exclude-ids'
[arvados.git] / sdk / python / arvados / events.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Follow events on an Arvados cluster
5
6 This module provides different ways to get notified about events that happen
7 on an Arvados cluster. You indicate which events you want updates about, and
8 provide a function that is called any time one of those events is received
9 from the server.
10
11 `subscribe` is the main entry point. It helps you construct one of the two
12 API-compatible client classes: `EventClient` (which uses WebSockets) or
13 `PollClient` (which periodically queries the logs list methods).
14 """
15
16 import enum
17 import json
18 import logging
19 import os
20 import re
21 import ssl
22 import sys
23 import _thread
24 import threading
25 import time
26
27 import websockets.exceptions as ws_exc
28 import websockets.sync.client as ws_client
29
30 from . import config
31 from . import errors
32 from . import util
33 from .retry import RetryLoop
34 from ._version import __version__
35
36 from typing import (
37     Any,
38     Callable,
39     Dict,
40     Iterable,
41     List,
42     Optional,
43     Union,
44 )
45
46 EventCallback = Callable[[Dict[str, Any]], object]
47 """Type signature for an event handler callback"""
48 FilterCondition = List[Union[None, str, 'Filter']]
49 """Type signature for a single filter condition"""
50 Filter = List[FilterCondition]
51 """Type signature for an entire filter"""
52
53 _logger = logging.getLogger('arvados.events')
54
55 class WSMethod(enum.Enum):
56     """Arvados WebSocket methods
57
58     This enum represents valid values for the `method` field in messages
59     sent to an Arvados WebSocket server.
60     """
61     SUBSCRIBE = 'subscribe'
62     SUB = SUBSCRIBE
63     UNSUBSCRIBE = 'unsubscribe'
64     UNSUB = UNSUBSCRIBE
65
66
67 class EventClient(threading.Thread):
68     """Follow Arvados events via WebSocket
69
70     EventClient follows events on Arvados cluster published by the WebSocket
71     server. Users can select the events they want to follow and run their own
72     callback function on each.
73     """
74     _USER_AGENT = 'Python/{}.{}.{} arvados.events/{}'.format(
75         *sys.version_info[:3],
76         __version__,
77     )
78
79     def __init__(
80             self,
81             url: str,
82             filters: Optional[Filter],
83             on_event_cb: EventCallback,
84             last_log_id: Optional[int]=None,
85             *,
86             insecure: Optional[bool]=None,
87     ) -> None:
88         """Initialize a WebSocket client
89
90         Constructor arguments:
91
92         * url: str --- The `wss` URL for an Arvados WebSocket server.
93
94         * filters: arvados.events.Filter | None --- One event filter to
95           subscribe to after connecting to the WebSocket server. If not
96           specified, the client will subscribe to all events.
97
98         * on_event_cb: arvados.events.EventCallback --- When the client
99           receives an event from the WebSocket server, it calls this
100           function with the event object.
101
102         * last_log_id: int | None --- If specified, this will be used as the
103           value for the `last_log_id` field in subscribe messages sent by
104           the client.
105
106         Constructor keyword arguments:
107
108         * insecure: bool | None --- If `True`, the client will not check the
109           validity of the server's TLS certificate. If not specified, uses
110           the value from the user's `ARVADOS_API_HOST_INSECURE` setting.
111         """
112         self.url = url
113         self.filters = [filters or []]
114         self.on_event_cb = on_event_cb
115         self.last_log_id = last_log_id
116         self.is_closed = threading.Event()
117         self._ssl_ctx = ssl.create_default_context(
118             purpose=ssl.Purpose.SERVER_AUTH,
119             cafile=util.ca_certs_path(),
120         )
121         if insecure is None:
122             insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
123         if insecure:
124             self._ssl_ctx.check_hostname = False
125             self._ssl_ctx.verify_mode = ssl.CERT_NONE
126         self._subscribe_lock = threading.Lock()
127         self._connect()
128         super().__init__(daemon=True)
129         self.start()
130
131     def _connect(self) -> None:
132         # There are no locks protecting this method. After the thread starts,
133         # it should only be called from inside.
134         self._client = ws_client.connect(
135             self.url,
136             logger=_logger,
137             ssl_context=self._ssl_ctx,
138             user_agent_header=self._USER_AGENT,
139         )
140         self._client_ok = True
141
142     def _subscribe(self, f: Filter, last_log_id: Optional[int]) -> None:
143         extra = {}
144         if last_log_id is not None:
145             extra['last_log_id'] = last_log_id
146         return self._update_sub(WSMethod.SUBSCRIBE, f, **extra)
147
148     def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None:
149         msg = json.dumps({
150             'method': method.value,
151             'filters': f,
152             **extra,
153         })
154         self._client.send(msg)
155
156     def close(self, code: int=1000, reason: str='', timeout: float=0) -> None:
157         """Close the WebSocket connection and stop processing events
158
159         Arguments:
160
161         * code: int --- The WebSocket close code sent to the server when
162           disconnecting. Default 1000.
163
164         * reason: str --- The WebSocket close reason sent to the server when
165           disconnecting. Default is an empty string.
166
167         * timeout: float --- How long to wait for the WebSocket server to
168           acknowledge the disconnection, in seconds. Default 0, which means
169           no timeout.
170         """
171         self.is_closed.set()
172         self._client.close_timeout = timeout or None
173         self._client.close(code, reason)
174
175     def run_forever(self) -> None:
176         """Run the WebSocket client indefinitely
177
178         This method blocks until the `close` method is called (e.g., from
179         another thread) or the client permanently loses its connection.
180         """
181         # Have to poll here to let KeyboardInterrupt get raised.
182         while not self.is_closed.wait(1):
183             pass
184
185     def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
186         """Subscribe to another set of events from the server
187
188         Arguments:
189
190         * f: arvados.events.Filter | None --- One filter to subscribe to
191           events for.
192
193         * last_log_id: int | None --- If specified, request events starting
194           from this id. If not specified, the server will only send events
195           that occur after processing the subscription.
196         """
197         with self._subscribe_lock:
198             self._subscribe(f, last_log_id)
199             self.filters.append(f)
200
201     def unsubscribe(self, f: Filter) -> None:
202         """Unsubscribe from an event stream
203
204         Arguments:
205
206         * f: arvados.events.Filter | None --- One event filter to stop
207         receiving events for.
208         """
209         with self._subscribe_lock:
210             try:
211                 index = self.filters.index(f)
212             except ValueError:
213                 raise ValueError(f"filter not subscribed: {f!r}") from None
214             self._update_sub(WSMethod.UNSUBSCRIBE, f)
215             del self.filters[index]
216
217     def on_closed(self) -> None:
218         """Handle disconnection from the WebSocket server
219
220         This method is called when the client loses its connection from
221         receiving events. This implementation tries to establish a new
222         connection if it was not closed client-side.
223         """
224         if self.is_closed.is_set():
225             return
226         _logger.warning("Unexpected close. Reconnecting.")
227         for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
228             try:
229                 self._connect()
230             except Exception as e:
231                 _logger.warning("Error '%s' during websocket reconnect.", e)
232             else:
233                 _logger.warning("Reconnect successful.")
234                 break
235         else:
236             _logger.error("EventClient thread could not contact websocket server.")
237             self.is_closed.set()
238             _thread.interrupt_main()
239
240     def on_event(self, m: Dict[str, Any]) -> None:
241         """Handle an event from the WebSocket server
242
243         This method is called whenever the client receives an event from the
244         server. This implementation records the `id` field internally, then
245         calls the callback function provided at initialization time.
246
247         Arguments:
248
249         * m: Dict[str, Any] --- The event object, deserialized from JSON.
250         """
251         try:
252             self.last_log_id = m['id']
253         except KeyError:
254             pass
255         try:
256             self.on_event_cb(m)
257         except Exception:
258             _logger.exception("Unexpected exception from event callback.")
259             _thread.interrupt_main()
260
261     def run(self) -> None:
262         """Run the client loop
263
264         This method runs in a separate thread to receive and process events
265         from the server.
266         """
267         self.name = f'ArvadosWebsockets-{self.ident}'
268         while self._client_ok and not self.is_closed.is_set():
269             try:
270                 with self._subscribe_lock:
271                     for f in self.filters:
272                         self._subscribe(f, self.last_log_id)
273                 for msg_s in self._client:
274                     if not self.is_closed.is_set():
275                         msg = json.loads(msg_s)
276                         self.on_event(msg)
277             except ws_exc.ConnectionClosed:
278                 self._client_ok = False
279                 self.on_closed()
280
281
282 class PollClient(threading.Thread):
283     """Follow Arvados events via polling logs
284
285     PollClient follows events on Arvados cluster by periodically running
286     logs list API calls. Users can select the events they want to follow and
287     run their own callback function on each.
288     """
289     def __init__(
290             self,
291             api: 'arvados.api_resources.ArvadosAPIClient',
292             filters: Optional[Filter],
293             on_event: EventCallback,
294             poll_time: float=15,
295             last_log_id: Optional[int]=None,
296     ) -> None:
297         """Initialize a polling client
298
299         Constructor arguments:
300
301         * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
302           client used to query logs. It will be used in a separate thread,
303           so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
304           it should not be reused after the thread is started.
305
306         * filters: arvados.events.Filter | None --- One event filter to
307           subscribe to after connecting to the WebSocket server. If not
308           specified, the client will subscribe to all events.
309
310         * on_event: arvados.events.EventCallback --- When the client
311           receives an event from the WebSocket server, it calls this
312           function with the event object.
313
314         * poll_time: float --- The number of seconds to wait between querying
315           logs. Default 15.
316
317         * last_log_id: int | None --- If specified, queries will include a
318           filter for logs with an `id` at least this value.
319         """
320         super(PollClient, self).__init__()
321         self.api = api
322         if filters:
323             self.filters = [filters]
324         else:
325             self.filters = [[]]
326         self.on_event = on_event
327         self.poll_time = poll_time
328         self.daemon = True
329         self.last_log_id = last_log_id
330         self._closing = threading.Event()
331         self._closing_lock = threading.RLock()
332
333         if self.last_log_id != None:
334             # Caller supplied the last-seen event ID from a previous
335             # connection.
336             self._skip_old_events = [["id", ">", str(self.last_log_id)]]
337         else:
338             # We need to do a reverse-order query to find the most
339             # recent event ID (see "if not self._skip_old_events"
340             # in run()).
341             self._skip_old_events = False
342
343     def run(self):
344         """Run the client loop
345
346         This method runs in a separate thread to poll and process events
347         from the server.
348         """
349         self.on_event({'status': 200})
350
351         while not self._closing.is_set():
352             moreitems = False
353             for f in self.filters:
354                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
355                     try:
356                         if not self._skip_old_events:
357                             # If the caller didn't provide a known
358                             # recent ID, our first request will ask
359                             # for the single most recent event from
360                             # the last 2 hours (the time restriction
361                             # avoids doing an expensive database
362                             # query, and leaves a big enough margin to
363                             # account for clock skew). If we do find a
364                             # recent event, we remember its ID but
365                             # then discard it (we are supposed to be
366                             # returning new/current events, not old
367                             # ones).
368                             #
369                             # Subsequent requests will get multiple
370                             # events in chronological order, and
371                             # filter on that same cutoff time, or
372                             # (once we see our first matching event)
373                             # the ID of the last-seen event.
374                             #
375                             # Note: self._skip_old_events must not be
376                             # set until the threshold is decided.
377                             # Otherwise, tests will be unreliable.
378                             filter_by_time = [[
379                                 "created_at", ">=",
380                                 time.strftime(
381                                     "%Y-%m-%dT%H:%M:%SZ",
382                                     time.gmtime(time.time()-7200))]]
383                             items = self.api.logs().list(
384                                 order="id desc",
385                                 limit=1,
386                                 filters=f+filter_by_time).execute()
387                             if items["items"]:
388                                 self._skip_old_events = [
389                                     ["id", ">", str(items["items"][0]["id"])]]
390                                 items = {
391                                     "items": [],
392                                     "items_available": 0,
393                                 }
394                             else:
395                                 # No recent events. We can keep using
396                                 # the same timestamp threshold until
397                                 # we receive our first new event.
398                                 self._skip_old_events = filter_by_time
399                         else:
400                             # In this case, either we know the most
401                             # recent matching ID, or we know there
402                             # were no matching events in the 2-hour
403                             # window before subscribing. Either way we
404                             # can safely ask for events in ascending
405                             # order.
406                             items = self.api.logs().list(
407                                 order="id asc",
408                                 filters=f+self._skip_old_events).execute()
409                         break
410                     except errors.ApiError as error:
411                         pass
412                     else:
413                         tries_left = 0
414                         break
415                 if tries_left == 0:
416                     _logger.exception("PollClient thread could not contact API server.")
417                     with self._closing_lock:
418                         self._closing.set()
419                     _thread.interrupt_main()
420                     return
421                 for i in items["items"]:
422                     self._skip_old_events = [["id", ">", str(i["id"])]]
423                     with self._closing_lock:
424                         if self._closing.is_set():
425                             return
426                         try:
427                             self.on_event(i)
428                         except Exception as e:
429                             _logger.exception("Unexpected exception from event callback.")
430                             _thread.interrupt_main()
431                 if items["items_available"] > len(items["items"]):
432                     moreitems = True
433             if not moreitems:
434                 self._closing.wait(self.poll_time)
435
436     def run_forever(self):
437         """Run the polling client indefinitely
438
439         This method blocks until the `close` method is called (e.g., from
440         another thread) or the client permanently loses its connection.
441         """
442         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
443         while not self._closing.is_set():
444             self._closing.wait(1)
445
446     def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
447         """Stop polling and processing events
448
449         Arguments:
450
451         * code: Optional[int] --- Ignored; this argument exists for API
452           compatibility with `EventClient.close`.
453
454         * reason: Optional[str] --- Ignored; this argument exists for API
455           compatibility with `EventClient.close`.
456
457         * timeout: float --- How long to wait for the client thread to finish
458           processing events. Default 0, which means no timeout.
459         """
460         with self._closing_lock:
461             self._closing.set()
462         try:
463             self.join(timeout=timeout)
464         except RuntimeError:
465             # "join() raises a RuntimeError if an attempt is made to join the
466             # current thread as that would cause a deadlock. It is also an
467             # error to join() a thread before it has been started and attempts
468             # to do so raises the same exception."
469             pass
470
471     def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
472         """Subscribe to another set of events from the server
473
474         Arguments:
475
476         * f: arvados.events.Filter | None --- One filter to subscribe to.
477
478         * last_log_id: Optional[int] --- Ignored; this argument exists for
479           API compatibility with `EventClient.subscribe`.
480         """
481         self.on_event({'status': 200})
482         self.filters.append(f)
483
484     def unsubscribe(self, f):
485         """Unsubscribe from an event stream
486
487         Arguments:
488
489         * f: arvados.events.Filter | None --- One event filter to stop
490         receiving events for.
491         """
492         del self.filters[self.filters.index(f)]
493
494
495 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
496     endpoint = api._rootDesc.get('websocketUrl', None)
497     if not endpoint:
498         raise errors.FeatureNotEnabledError(
499             "Server does not advertise a websocket endpoint")
500     uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
501     try:
502         client = EventClient(uri_with_token, filters, on_event, last_log_id)
503     except Exception:
504         _logger.warning("Failed to connect to websockets on %s" % endpoint)
505         raise
506     else:
507         return client
508
509 def subscribe(
510         api: 'arvados.api_resources.ArvadosAPIClient',
511         filters: Optional[Filter],
512         on_event: EventCallback,
513         poll_fallback: float=15,
514         last_log_id: Optional[int]=None,
515 ) -> Union[EventClient, PollClient]:
516     """Start a thread to monitor events
517
518     This method tries to construct an `EventClient` to process Arvados
519     events via WebSockets. If that fails, or the
520     `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls
521     back to constructing a `PollClient` to process the events via API
522     polling.
523
524     Arguments:
525
526     * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
527       client used to query logs. It may be used in a separate thread,
528       so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
529       it should not be reused after this method returns.
530
531     * filters: arvados.events.Filter | None --- One event filter to
532       subscribe to after initializing the client. If not specified, the
533       client will subscribe to all events.
534
535     * on_event: arvados.events.EventCallback --- When the client receives an
536       event, it calls this function with the event object.
537
538     * poll_time: float --- The number of seconds to wait between querying
539       logs. If 0, this function will refuse to construct a `PollClient`.
540       Default 15.
541
542     * last_log_id: int | None --- If specified, start processing events with
543       at least this `id` value.
544     """
545     if not poll_fallback:
546         return _subscribe_websocket(api, filters, on_event, last_log_id)
547
548     try:
549         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
550             return _subscribe_websocket(api, filters, on_event, last_log_id)
551         else:
552             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
553     except Exception as e:
554         _logger.warning("Falling back to polling after websocket error: %s" % e)
555     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
556     p.start()
557     return p