17995: Merge branch 'main'
[arvados.git] / sdk / python / arvados / events.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future import standard_library
7 standard_library.install_aliases()
8 from builtins import str
9 from builtins import object
10 import arvados
11 from . import config
12 from . import errors
13 from .retry import RetryLoop
14
15 import logging
16 import json
17 import _thread
18 import threading
19 import time
20 import os
21 import re
22 import ssl
23 from ws4py.client.threadedclient import WebSocketClient
24
25 _logger = logging.getLogger('arvados.events')
26
27
28 class _EventClient(WebSocketClient):
29     def __init__(self, url, filters, on_event, last_log_id, on_closed):
30         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
31         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
32             ssl_options['cert_reqs'] = ssl.CERT_NONE
33         else:
34             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
35
36         # Warning: If the host part of url resolves to both IPv6 and
37         # IPv4 addresses (common with "localhost"), only one of them
38         # will be attempted -- and it might not be the right one. See
39         # ws4py's WebSocketBaseClient.__init__.
40         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
41
42         self.filters = filters
43         self.on_event = on_event
44         self.last_log_id = last_log_id
45         self._closing_lock = threading.RLock()
46         self._closing = False
47         self._closed = threading.Event()
48         self.on_closed = on_closed
49
50     def opened(self):
51         for f in self.filters:
52             self.subscribe(f, self.last_log_id)
53
54     def closed(self, code, reason=None):
55         self._closed.set()
56         self.on_closed()
57
58     def received_message(self, m):
59         with self._closing_lock:
60             if not self._closing:
61                 self.on_event(json.loads(str(m)))
62
63     def close(self, code=1000, reason='', timeout=0):
64         """Close event client and optionally wait for it to finish.
65
66         :timeout: is the number of seconds to wait for ws4py to
67         indicate that the connection has closed.
68         """
69         super(_EventClient, self).close(code, reason)
70         with self._closing_lock:
71             # make sure we don't process any more messages.
72             self._closing = True
73         # wait for ws4py to tell us the connection is closed.
74         self._closed.wait(timeout=timeout)
75
76     def subscribe(self, f, last_log_id=None):
77         m = {"method": "subscribe", "filters": f}
78         if last_log_id is not None:
79             m["last_log_id"] = last_log_id
80         self.send(json.dumps(m))
81
82     def unsubscribe(self, f):
83         self.send(json.dumps({"method": "unsubscribe", "filters": f}))
84
85
86 class EventClient(object):
87     def __init__(self, url, filters, on_event_cb, last_log_id):
88         self.url = url
89         if filters:
90             self.filters = [filters]
91         else:
92             self.filters = [[]]
93         self.on_event_cb = on_event_cb
94         self.last_log_id = last_log_id
95         self.is_closed = threading.Event()
96         self._setup_event_client()
97
98     def _setup_event_client(self):
99         self.ec = _EventClient(self.url, self.filters, self.on_event,
100                                self.last_log_id, self.on_closed)
101         self.ec.daemon = True
102         try:
103             self.ec.connect()
104         except Exception:
105             self.ec.close_connection()
106             raise
107
108     def subscribe(self, f, last_log_id=None):
109         self.filters.append(f)
110         self.ec.subscribe(f, last_log_id)
111
112     def unsubscribe(self, f):
113         del self.filters[self.filters.index(f)]
114         self.ec.unsubscribe(f)
115
116     def close(self, code=1000, reason='', timeout=0):
117         self.is_closed.set()
118         self.ec.close(code, reason, timeout)
119
120     def on_event(self, m):
121         if m.get('id') != None:
122             self.last_log_id = m.get('id')
123         try:
124             self.on_event_cb(m)
125         except Exception as e:
126             _logger.exception("Unexpected exception from event callback.")
127             _thread.interrupt_main()
128
129     def on_closed(self):
130         if not self.is_closed.is_set():
131             _logger.warning("Unexpected close. Reconnecting.")
132             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
133                 try:
134                     self._setup_event_client()
135                     _logger.warning("Reconnect successful.")
136                     break
137                 except Exception as e:
138                     _logger.warning("Error '%s' during websocket reconnect.", e)
139             if tries_left == 0:
140                 _logger.exception("EventClient thread could not contact websocket server.")
141                 self.is_closed.set()
142                 _thread.interrupt_main()
143                 return
144
145     def run_forever(self):
146         # Have to poll here to let KeyboardInterrupt get raised.
147         while not self.is_closed.wait(1):
148             pass
149
150
151 class PollClient(threading.Thread):
152     def __init__(self, api, filters, on_event, poll_time, last_log_id):
153         super(PollClient, self).__init__()
154         self.api = api
155         if filters:
156             self.filters = [filters]
157         else:
158             self.filters = [[]]
159         self.on_event = on_event
160         self.poll_time = poll_time
161         self.daemon = True
162         self.last_log_id = last_log_id
163         self._closing = threading.Event()
164         self._closing_lock = threading.RLock()
165
166         if self.last_log_id != None:
167             # Caller supplied the last-seen event ID from a previous
168             # connection.
169             self._skip_old_events = [["id", ">", str(self.last_log_id)]]
170         else:
171             # We need to do a reverse-order query to find the most
172             # recent event ID (see "if not self._skip_old_events"
173             # in run()).
174             self._skip_old_events = False
175
176     def run(self):
177         self.on_event({'status': 200})
178
179         while not self._closing.is_set():
180             moreitems = False
181             for f in self.filters:
182                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
183                     try:
184                         if not self._skip_old_events:
185                             # If the caller didn't provide a known
186                             # recent ID, our first request will ask
187                             # for the single most recent event from
188                             # the last 2 hours (the time restriction
189                             # avoids doing an expensive database
190                             # query, and leaves a big enough margin to
191                             # account for clock skew). If we do find a
192                             # recent event, we remember its ID but
193                             # then discard it (we are supposed to be
194                             # returning new/current events, not old
195                             # ones).
196                             #
197                             # Subsequent requests will get multiple
198                             # events in chronological order, and
199                             # filter on that same cutoff time, or
200                             # (once we see our first matching event)
201                             # the ID of the last-seen event.
202                             #
203                             # Note: self._skip_old_events must not be
204                             # set until the threshold is decided.
205                             # Otherwise, tests will be unreliable.
206                             filter_by_time = [[
207                                 "created_at", ">=",
208                                 time.strftime(
209                                     "%Y-%m-%dT%H:%M:%SZ",
210                                     time.gmtime(time.time()-7200))]]
211                             items = self.api.logs().list(
212                                 order="id desc",
213                                 limit=1,
214                                 filters=f+filter_by_time).execute()
215                             if items["items"]:
216                                 self._skip_old_events = [
217                                     ["id", ">", str(items["items"][0]["id"])]]
218                                 items = {
219                                     "items": [],
220                                     "items_available": 0,
221                                 }
222                             else:
223                                 # No recent events. We can keep using
224                                 # the same timestamp threshold until
225                                 # we receive our first new event.
226                                 self._skip_old_events = filter_by_time
227                         else:
228                             # In this case, either we know the most
229                             # recent matching ID, or we know there
230                             # were no matching events in the 2-hour
231                             # window before subscribing. Either way we
232                             # can safely ask for events in ascending
233                             # order.
234                             items = self.api.logs().list(
235                                 order="id asc",
236                                 filters=f+self._skip_old_events).execute()
237                         break
238                     except errors.ApiError as error:
239                         pass
240                     else:
241                         tries_left = 0
242                         break
243                 if tries_left == 0:
244                     _logger.exception("PollClient thread could not contact API server.")
245                     with self._closing_lock:
246                         self._closing.set()
247                     _thread.interrupt_main()
248                     return
249                 for i in items["items"]:
250                     self._skip_old_events = [["id", ">", str(i["id"])]]
251                     with self._closing_lock:
252                         if self._closing.is_set():
253                             return
254                         try:
255                             self.on_event(i)
256                         except Exception as e:
257                             _logger.exception("Unexpected exception from event callback.")
258                             _thread.interrupt_main()
259                 if items["items_available"] > len(items["items"]):
260                     moreitems = True
261             if not moreitems:
262                 self._closing.wait(self.poll_time)
263
264     def run_forever(self):
265         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
266         while not self._closing.is_set():
267             self._closing.wait(1)
268
269     def close(self, code=None, reason=None, timeout=0):
270         """Close poll client and optionally wait for it to finish.
271
272         If an :on_event: handler is running in a different thread,
273         first wait (indefinitely) for it to return.
274
275         After closing, wait up to :timeout: seconds for the thread to
276         finish the poll request in progress (if any).
277
278         :code: and :reason: are ignored. They are present for
279         interface compatibility with EventClient.
280         """
281
282         with self._closing_lock:
283             self._closing.set()
284         try:
285             self.join(timeout=timeout)
286         except RuntimeError:
287             # "join() raises a RuntimeError if an attempt is made to join the
288             # current thread as that would cause a deadlock. It is also an
289             # error to join() a thread before it has been started and attempts
290             # to do so raises the same exception."
291             pass
292
293     def subscribe(self, f):
294         self.on_event({'status': 200})
295         self.filters.append(f)
296
297     def unsubscribe(self, f):
298         del self.filters[self.filters.index(f)]
299
300
301 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
302     endpoint = api._rootDesc.get('websocketUrl', None)
303     if not endpoint:
304         raise errors.FeatureNotEnabledError(
305             "Server does not advertise a websocket endpoint")
306     uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
307     try:
308         client = EventClient(uri_with_token, filters, on_event, last_log_id)
309     except Exception:
310         _logger.warning("Failed to connect to websockets on %s" % endpoint)
311         raise
312     else:
313         return client
314
315
316 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
317     """
318     :api:
319       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
320     :filters:
321       Initial subscription filters.
322     :on_event:
323       The callback when a message is received.
324     :poll_fallback:
325       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
326     :last_log_id:
327       Log rows that are newer than the log id
328     """
329
330     if not poll_fallback:
331         return _subscribe_websocket(api, filters, on_event, last_log_id)
332
333     try:
334         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
335             return _subscribe_websocket(api, filters, on_event, last_log_id)
336         else:
337             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
338     except Exception as e:
339         _logger.warning("Falling back to polling after websocket error: %s" % e)
340     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
341     p.start()
342     return p