Merge branch '13330-collection-save'
[arvados.git] / sdk / python / arvados / events.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future import standard_library
7 standard_library.install_aliases()
8 from builtins import str
9 from builtins import object
10 import arvados
11 from . import config
12 from . import errors
13 from .retry import RetryLoop
14
15 import logging
16 import json
17 import _thread
18 import threading
19 import time
20 import os
21 import re
22 import ssl
23 from ws4py.client.threadedclient import WebSocketClient
24
25 _logger = logging.getLogger('arvados.events')
26
27
28 class _EventClient(WebSocketClient):
29     def __init__(self, url, filters, on_event, last_log_id, on_closed):
30         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
31         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
32             ssl_options['cert_reqs'] = ssl.CERT_NONE
33         else:
34             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
35
36         # Warning: If the host part of url resolves to both IPv6 and
37         # IPv4 addresses (common with "localhost"), only one of them
38         # will be attempted -- and it might not be the right one. See
39         # ws4py's WebSocketBaseClient.__init__.
40         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
41
42         self.filters = filters
43         self.on_event = on_event
44         self.last_log_id = last_log_id
45         self._closing_lock = threading.RLock()
46         self._closing = False
47         self._closed = threading.Event()
48         self.on_closed = on_closed
49
50     def opened(self):
51         for f in self.filters:
52             self.subscribe(f, self.last_log_id)
53
54     def closed(self, code, reason=None):
55         self._closed.set()
56         self.on_closed()
57
58     def received_message(self, m):
59         with self._closing_lock:
60             if not self._closing:
61                 self.on_event(json.loads(str(m)))
62
63     def close(self, code=1000, reason='', timeout=0):
64         """Close event client and optionally wait for it to finish.
65
66         :timeout: is the number of seconds to wait for ws4py to
67         indicate that the connection has closed.
68         """
69         super(_EventClient, self).close(code, reason)
70         with self._closing_lock:
71             # make sure we don't process any more messages.
72             self._closing = True
73         # wait for ws4py to tell us the connection is closed.
74         self._closed.wait(timeout=timeout)
75
76     def subscribe(self, f, last_log_id=None):
77         m = {"method": "subscribe", "filters": f}
78         if last_log_id is not None:
79             m["last_log_id"] = last_log_id
80         self.send(json.dumps(m))
81
82     def unsubscribe(self, f):
83         self.send(json.dumps({"method": "unsubscribe", "filters": f}))
84
85
86 class EventClient(object):
87     def __init__(self, url, filters, on_event_cb, last_log_id):
88         self.url = url
89         if filters:
90             self.filters = [filters]
91         else:
92             self.filters = [[]]
93         self.on_event_cb = on_event_cb
94         self.last_log_id = last_log_id
95         self.is_closed = threading.Event()
96         self._setup_event_client()
97
98     def _setup_event_client(self):
99         self.ec = _EventClient(self.url, self.filters, self.on_event,
100                                self.last_log_id, self.on_closed)
101         self.ec.daemon = True
102         try:
103             self.ec.connect()
104         except Exception:
105             self.ec.close_connection()
106             raise
107
108     def subscribe(self, f, last_log_id=None):
109         self.filters.append(f)
110         self.ec.subscribe(f, last_log_id)
111
112     def unsubscribe(self, f):
113         del self.filters[self.filters.index(f)]
114         self.ec.unsubscribe(f)
115
116     def close(self, code=1000, reason='', timeout=0):
117         self.is_closed.set()
118         self.ec.close(code, reason, timeout)
119
120     def on_event(self, m):
121         if m.get('id') != None:
122             self.last_log_id = m.get('id')
123         try:
124             self.on_event_cb(m)
125         except Exception as e:
126             _logger.exception("Unexpected exception from event callback.")
127             _thread.interrupt_main()
128
129     def on_closed(self):
130         if not self.is_closed.is_set():
131             _logger.warning("Unexpected close. Reconnecting.")
132             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
133                 try:
134                     self._setup_event_client()
135                     _logger.warning("Reconnect successful.")
136                     break
137                 except Exception as e:
138                     _logger.warning("Error '%s' during websocket reconnect.", e)
139             if tries_left == 0:
140                 _logger.exception("EventClient thread could not contact websocket server.")
141                 self.is_closed.set()
142                 _thread.interrupt_main()
143                 return
144
145     def run_forever(self):
146         # Have to poll here to let KeyboardInterrupt get raised.
147         while not self.is_closed.wait(1):
148             pass
149
150
151 class PollClient(threading.Thread):
152     def __init__(self, api, filters, on_event, poll_time, last_log_id):
153         super(PollClient, self).__init__()
154         self.api = api
155         if filters:
156             self.filters = [filters]
157         else:
158             self.filters = [[]]
159         self.on_event = on_event
160         self.poll_time = poll_time
161         self.daemon = True
162         self.last_log_id = last_log_id
163         self._closing = threading.Event()
164         self._closing_lock = threading.RLock()
165
166     def run(self):
167         if self.last_log_id != None:
168             # Caller supplied the last-seen event ID from a previous
169             # connection
170             skip_old_events = [["id", ">", str(self.last_log_id)]]
171         else:
172             # We need to do a reverse-order query to find the most
173             # recent event ID (see "if not skip_old_events" below).
174             skip_old_events = False
175
176         self.on_event({'status': 200})
177
178         while not self._closing.is_set():
179             moreitems = False
180             for f in self.filters:
181                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
182                     try:
183                         if not skip_old_events:
184                             # If the caller didn't provide a known
185                             # recent ID, our first request will ask
186                             # for the single most recent event from
187                             # the last 2 hours (the time restriction
188                             # avoids doing an expensive database
189                             # query, and leaves a big enough margin to
190                             # account for clock skew). If we do find a
191                             # recent event, we remember its ID but
192                             # then discard it (we are supposed to be
193                             # returning new/current events, not old
194                             # ones).
195                             #
196                             # Subsequent requests will get multiple
197                             # events in chronological order, and
198                             # filter on that same cutoff time, or
199                             # (once we see our first matching event)
200                             # the ID of the last-seen event.
201                             skip_old_events = [[
202                                 "created_at", ">=",
203                                 time.strftime(
204                                     "%Y-%m-%dT%H:%M:%SZ",
205                                     time.gmtime(time.time()-7200))]]
206                             items = self.api.logs().list(
207                                 order="id desc",
208                                 limit=1,
209                                 filters=f+skip_old_events).execute()
210                             if items["items"]:
211                                 skip_old_events = [
212                                     ["id", ">", str(items["items"][0]["id"])]]
213                                 items = {
214                                     "items": [],
215                                     "items_available": 0,
216                                 }
217                         else:
218                             # In this case, either we know the most
219                             # recent matching ID, or we know there
220                             # were no matching events in the 2-hour
221                             # window before subscribing. Either way we
222                             # can safely ask for events in ascending
223                             # order.
224                             items = self.api.logs().list(
225                                 order="id asc",
226                                 filters=f+skip_old_events).execute()
227                         break
228                     except errors.ApiError as error:
229                         pass
230                     else:
231                         tries_left = 0
232                         break
233                 if tries_left == 0:
234                     _logger.exception("PollClient thread could not contact API server.")
235                     with self._closing_lock:
236                         self._closing.set()
237                     _thread.interrupt_main()
238                     return
239                 for i in items["items"]:
240                     skip_old_events = [["id", ">", str(i["id"])]]
241                     with self._closing_lock:
242                         if self._closing.is_set():
243                             return
244                         try:
245                             self.on_event(i)
246                         except Exception as e:
247                             _logger.exception("Unexpected exception from event callback.")
248                             _thread.interrupt_main()
249                 if items["items_available"] > len(items["items"]):
250                     moreitems = True
251             if not moreitems:
252                 self._closing.wait(self.poll_time)
253
254     def run_forever(self):
255         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
256         while not self._closing.is_set():
257             self._closing.wait(1)
258
259     def close(self, code=None, reason=None, timeout=0):
260         """Close poll client and optionally wait for it to finish.
261
262         If an :on_event: handler is running in a different thread,
263         first wait (indefinitely) for it to return.
264
265         After closing, wait up to :timeout: seconds for the thread to
266         finish the poll request in progress (if any).
267
268         :code: and :reason: are ignored. They are present for
269         interface compatibility with EventClient.
270         """
271
272         with self._closing_lock:
273             self._closing.set()
274         try:
275             self.join(timeout=timeout)
276         except RuntimeError:
277             # "join() raises a RuntimeError if an attempt is made to join the
278             # current thread as that would cause a deadlock. It is also an
279             # error to join() a thread before it has been started and attempts
280             # to do so raises the same exception."
281             pass
282
283     def subscribe(self, f):
284         self.on_event({'status': 200})
285         self.filters.append(f)
286
287     def unsubscribe(self, f):
288         del self.filters[self.filters.index(f)]
289
290
291 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
292     endpoint = api._rootDesc.get('websocketUrl', None)
293     if not endpoint:
294         raise errors.FeatureNotEnabledError(
295             "Server does not advertise a websocket endpoint")
296     uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
297     try:
298         client = EventClient(uri_with_token, filters, on_event, last_log_id)
299     except Exception:
300         _logger.warning("Failed to connect to websockets on %s" % endpoint)
301         raise
302     else:
303         return client
304
305
306 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
307     """
308     :api:
309       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
310     :filters:
311       Initial subscription filters.
312     :on_event:
313       The callback when a message is received.
314     :poll_fallback:
315       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
316     :last_log_id:
317       Log rows that are newer than the log id
318     """
319
320     if not poll_fallback:
321         return _subscribe_websocket(api, filters, on_event, last_log_id)
322
323     try:
324         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
325             return _subscribe_websocket(api, filters, on_event, last_log_id)
326         else:
327             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
328     except Exception as e:
329         _logger.warning("Falling back to polling after websocket error: %s" % e)
330     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
331     p.start()
332     return p