Merge branch '12020-flaky-py-test'
[arvados.git] / sdk / python / arvados / events.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future import standard_library
7 standard_library.install_aliases()
8 from builtins import str
9 from builtins import object
10 import arvados
11 from . import config
12 from . import errors
13 from .retry import RetryLoop
14
15 import logging
16 import json
17 import _thread
18 import threading
19 import time
20 import os
21 import re
22 import ssl
23 from ws4py.client.threadedclient import WebSocketClient
24
25 _logger = logging.getLogger('arvados.events')
26
27
28 class _EventClient(WebSocketClient):
29     def __init__(self, url, filters, on_event, last_log_id, on_closed):
30         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
31         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
32             ssl_options['cert_reqs'] = ssl.CERT_NONE
33         else:
34             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
35
36         # Warning: If the host part of url resolves to both IPv6 and
37         # IPv4 addresses (common with "localhost"), only one of them
38         # will be attempted -- and it might not be the right one. See
39         # ws4py's WebSocketBaseClient.__init__.
40         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
41
42         self.filters = filters
43         self.on_event = on_event
44         self.last_log_id = last_log_id
45         self._closing_lock = threading.RLock()
46         self._closing = False
47         self._closed = threading.Event()
48         self.on_closed = on_closed
49
50     def opened(self):
51         for f in self.filters:
52             self.subscribe(f, self.last_log_id)
53
54     def closed(self, code, reason=None):
55         self._closed.set()
56         self.on_closed()
57
58     def received_message(self, m):
59         with self._closing_lock:
60             if not self._closing:
61                 self.on_event(json.loads(str(m)))
62
63     def close(self, code=1000, reason='', timeout=0):
64         """Close event client and optionally wait for it to finish.
65
66         :timeout: is the number of seconds to wait for ws4py to
67         indicate that the connection has closed.
68         """
69         super(_EventClient, self).close(code, reason)
70         with self._closing_lock:
71             # make sure we don't process any more messages.
72             self._closing = True
73         # wait for ws4py to tell us the connection is closed.
74         self._closed.wait(timeout=timeout)
75
76     def subscribe(self, f, last_log_id=None):
77         m = {"method": "subscribe", "filters": f}
78         if last_log_id is not None:
79             m["last_log_id"] = last_log_id
80         self.send(json.dumps(m))
81
82     def unsubscribe(self, f):
83         self.send(json.dumps({"method": "unsubscribe", "filters": f}))
84
85
86 class EventClient(object):
87     def __init__(self, url, filters, on_event_cb, last_log_id):
88         self.url = url
89         if filters:
90             self.filters = [filters]
91         else:
92             self.filters = [[]]
93         self.on_event_cb = on_event_cb
94         self.last_log_id = last_log_id
95         self.is_closed = threading.Event()
96         self._setup_event_client()
97
98     def _setup_event_client(self):
99         self.ec = _EventClient(self.url, self.filters, self.on_event,
100                                self.last_log_id, self.on_closed)
101         self.ec.daemon = True
102         try:
103             self.ec.connect()
104         except Exception:
105             self.ec.close_connection()
106             raise
107
108     def subscribe(self, f, last_log_id=None):
109         self.filters.append(f)
110         self.ec.subscribe(f, last_log_id)
111
112     def unsubscribe(self, f):
113         del self.filters[self.filters.index(f)]
114         self.ec.unsubscribe(f)
115
116     def close(self, code=1000, reason='', timeout=0):
117         self.is_closed.set()
118         self.ec.close(code, reason, timeout)
119
120     def on_event(self, m):
121         if m.get('id') != None:
122             self.last_log_id = m.get('id')
123         try:
124             self.on_event_cb(m)
125         except Exception as e:
126             _logger.exception("Unexpected exception from event callback.")
127             _thread.interrupt_main()
128
129     def on_closed(self):
130         if not self.is_closed.is_set():
131             _logger.warning("Unexpected close. Reconnecting.")
132             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
133                 try:
134                     self._setup_event_client()
135                     _logger.warning("Reconnect successful.")
136                     break
137                 except Exception as e:
138                     _logger.warning("Error '%s' during websocket reconnect.", e)
139             if tries_left == 0:
140                 _logger.exception("EventClient thread could not contact websocket server.")
141                 self.is_closed.set()
142                 _thread.interrupt_main()
143                 return
144
145     def run_forever(self):
146         # Have to poll here to let KeyboardInterrupt get raised.
147         while not self.is_closed.wait(1):
148             pass
149
150
151 class PollClient(threading.Thread):
152     def __init__(self, api, filters, on_event, poll_time, last_log_id):
153         super(PollClient, self).__init__()
154         self.api = api
155         if filters:
156             self.filters = [filters]
157         else:
158             self.filters = [[]]
159         self.on_event = on_event
160         self.poll_time = poll_time
161         self.daemon = True
162         self.last_log_id = last_log_id
163         self._closing = threading.Event()
164         self._closing_lock = threading.RLock()
165
166         if self.last_log_id != None:
167             # Caller supplied the last-seen event ID from a previous
168             # connection.
169             self._skip_old_events = [["id", ">", str(self.last_log_id)]]
170         else:
171             # We need to do a reverse-order query to find the most
172             # recent event ID (see "if not self._skip_old_events"
173             # in run()).
174             self._skip_old_events = False
175
176     def run(self):
177         self.on_event({'status': 200})
178
179         while not self._closing.is_set():
180             moreitems = False
181             for f in self.filters:
182                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
183                     try:
184                         if not self._skip_old_events:
185                             # If the caller didn't provide a known
186                             # recent ID, our first request will ask
187                             # for the single most recent event from
188                             # the last 2 hours (the time restriction
189                             # avoids doing an expensive database
190                             # query, and leaves a big enough margin to
191                             # account for clock skew). If we do find a
192                             # recent event, we remember its ID but
193                             # then discard it (we are supposed to be
194                             # returning new/current events, not old
195                             # ones).
196                             #
197                             # Subsequent requests will get multiple
198                             # events in chronological order, and
199                             # filter on that same cutoff time, or
200                             # (once we see our first matching event)
201                             # the ID of the last-seen event.
202                             self._skip_old_events = [[
203                                 "created_at", ">=",
204                                 time.strftime(
205                                     "%Y-%m-%dT%H:%M:%SZ",
206                                     time.gmtime(time.time()-7200))]]
207                             items = self.api.logs().list(
208                                 order="id desc",
209                                 limit=1,
210                                 filters=f+self._skip_old_events).execute()
211                             if items["items"]:
212                                 self._skip_old_events = [
213                                     ["id", ">", str(items["items"][0]["id"])]]
214                                 items = {
215                                     "items": [],
216                                     "items_available": 0,
217                                 }
218                         else:
219                             # In this case, either we know the most
220                             # recent matching ID, or we know there
221                             # were no matching events in the 2-hour
222                             # window before subscribing. Either way we
223                             # can safely ask for events in ascending
224                             # order.
225                             items = self.api.logs().list(
226                                 order="id asc",
227                                 filters=f+self._skip_old_events).execute()
228                         break
229                     except errors.ApiError as error:
230                         pass
231                     else:
232                         tries_left = 0
233                         break
234                 if tries_left == 0:
235                     _logger.exception("PollClient thread could not contact API server.")
236                     with self._closing_lock:
237                         self._closing.set()
238                     _thread.interrupt_main()
239                     return
240                 for i in items["items"]:
241                     self._skip_old_events = [["id", ">", str(i["id"])]]
242                     with self._closing_lock:
243                         if self._closing.is_set():
244                             return
245                         try:
246                             self.on_event(i)
247                         except Exception as e:
248                             _logger.exception("Unexpected exception from event callback.")
249                             _thread.interrupt_main()
250                 if items["items_available"] > len(items["items"]):
251                     moreitems = True
252             if not moreitems:
253                 self._closing.wait(self.poll_time)
254
255     def run_forever(self):
256         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
257         while not self._closing.is_set():
258             self._closing.wait(1)
259
260     def close(self, code=None, reason=None, timeout=0):
261         """Close poll client and optionally wait for it to finish.
262
263         If an :on_event: handler is running in a different thread,
264         first wait (indefinitely) for it to return.
265
266         After closing, wait up to :timeout: seconds for the thread to
267         finish the poll request in progress (if any).
268
269         :code: and :reason: are ignored. They are present for
270         interface compatibility with EventClient.
271         """
272
273         with self._closing_lock:
274             self._closing.set()
275         try:
276             self.join(timeout=timeout)
277         except RuntimeError:
278             # "join() raises a RuntimeError if an attempt is made to join the
279             # current thread as that would cause a deadlock. It is also an
280             # error to join() a thread before it has been started and attempts
281             # to do so raises the same exception."
282             pass
283
284     def subscribe(self, f):
285         self.on_event({'status': 200})
286         self.filters.append(f)
287
288     def unsubscribe(self, f):
289         del self.filters[self.filters.index(f)]
290
291
292 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
293     endpoint = api._rootDesc.get('websocketUrl', None)
294     if not endpoint:
295         raise errors.FeatureNotEnabledError(
296             "Server does not advertise a websocket endpoint")
297     uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
298     try:
299         client = EventClient(uri_with_token, filters, on_event, last_log_id)
300     except Exception:
301         _logger.warning("Failed to connect to websockets on %s" % endpoint)
302         raise
303     else:
304         return client
305
306
307 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
308     """
309     :api:
310       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
311     :filters:
312       Initial subscription filters.
313     :on_event:
314       The callback when a message is received.
315     :poll_fallback:
316       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
317     :last_log_id:
318       Log rows that are newer than the log id
319     """
320
321     if not poll_fallback:
322         return _subscribe_websocket(api, filters, on_event, last_log_id)
323
324     try:
325         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
326             return _subscribe_websocket(api, filters, on_event, last_log_id)
327         else:
328             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
329     except Exception as e:
330         _logger.warning("Falling back to polling after websocket error: %s" % e)
331     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
332     p.start()
333     return p