Merge branch '9417-asserttrue-for-value-comparison'
[arvados.git] / sdk / python / arvados / events.py
1 import arvados
2 import config
3 import errors
4 from retry import RetryLoop
5
6 import logging
7 import json
8 import thread
9 import threading
10 import time
11 import os
12 import re
13 import ssl
14 from ws4py.client.threadedclient import WebSocketClient
15
16 _logger = logging.getLogger('arvados.events')
17
18
19 class _EventClient(WebSocketClient):
20     def __init__(self, url, filters, on_event, last_log_id, on_closed):
21         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
22         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
23             ssl_options['cert_reqs'] = ssl.CERT_NONE
24         else:
25             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
26
27         # Warning: If the host part of url resolves to both IPv6 and
28         # IPv4 addresses (common with "localhost"), only one of them
29         # will be attempted -- and it might not be the right one. See
30         # ws4py's WebSocketBaseClient.__init__.
31         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
32
33         self.filters = filters
34         self.on_event = on_event
35         self.last_log_id = last_log_id
36         self._closing_lock = threading.RLock()
37         self._closing = False
38         self._closed = threading.Event()
39         self.on_closed = on_closed
40
41     def opened(self):
42         for f in self.filters:
43             self.subscribe(f, self.last_log_id)
44
45     def closed(self, code, reason=None):
46         self._closed.set()
47         self.on_closed()
48
49     def received_message(self, m):
50         with self._closing_lock:
51             if not self._closing:
52                 self.on_event(json.loads(str(m)))
53
54     def close(self, code=1000, reason='', timeout=0):
55         """Close event client and optionally wait for it to finish.
56
57         :timeout: is the number of seconds to wait for ws4py to
58         indicate that the connection has closed.
59         """
60         super(_EventClient, self).close(code, reason)
61         with self._closing_lock:
62             # make sure we don't process any more messages.
63             self._closing = True
64         # wait for ws4py to tell us the connection is closed.
65         self._closed.wait(timeout=timeout)
66
67     def subscribe(self, f, last_log_id=None):
68         m = {"method": "subscribe", "filters": f}
69         if last_log_id is not None:
70             m["last_log_id"] = last_log_id
71         self.send(json.dumps(m))
72
73     def unsubscribe(self, f):
74         self.send(json.dumps({"method": "unsubscribe", "filters": f}))
75
76
77 class EventClient(object):
78     def __init__(self, url, filters, on_event_cb, last_log_id):
79         self.url = url
80         if filters:
81             self.filters = [filters]
82         else:
83             self.filters = [[]]
84         self.on_event_cb = on_event_cb
85         self.last_log_id = last_log_id
86         self.is_closed = threading.Event()
87         self._setup_event_client()
88
89     def _setup_event_client(self):
90         self.ec = _EventClient(self.url, self.filters, self.on_event,
91                                self.last_log_id, self.on_closed)
92         self.ec.daemon = True
93         try:
94             self.ec.connect()
95         except Exception:
96             self.ec.close_connection()
97             raise
98
99     def subscribe(self, f, last_log_id=None):
100         self.filters.append(f)
101         self.ec.subscribe(f, last_log_id)
102
103     def unsubscribe(self, f):
104         del self.filters[self.filters.index(f)]
105         self.ec.unsubscribe(f)
106
107     def close(self, code=1000, reason='', timeout=0):
108         self.is_closed.set()
109         self.ec.close(code, reason, timeout)
110
111     def on_event(self, m):
112         if m.get('id') != None:
113             self.last_log_id = m.get('id')
114         try:
115             self.on_event_cb(m)
116         except Exception as e:
117             _logger.exception("Unexpected exception from event callback.")
118             thread.interrupt_main()
119
120     def on_closed(self):
121         if not self.is_closed.is_set():
122             _logger.warn("Unexpected close. Reconnecting.")
123             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
124                 try:
125                     self._setup_event_client()
126                     break
127                 except Exception as e:
128                     _logger.warn("Error '%s' during websocket reconnect.", e)
129             if tries_left == 0:
130                 _logger.exception("EventClient thread could not contact websocket server.")
131                 self.is_closed.set()
132                 thread.interrupt_main()
133                 return
134
135     def run_forever(self):
136         # Have to poll here to let KeyboardInterrupt get raised.
137         while not self.is_closed.wait(1):
138             pass
139
140
141 class PollClient(threading.Thread):
142     def __init__(self, api, filters, on_event, poll_time, last_log_id):
143         super(PollClient, self).__init__()
144         self.api = api
145         if filters:
146             self.filters = [filters]
147         else:
148             self.filters = [[]]
149         self.on_event = on_event
150         self.poll_time = poll_time
151         self.daemon = True
152         self.last_log_id = last_log_id
153         self._closing = threading.Event()
154         self._closing_lock = threading.RLock()
155
156     def run(self):
157         self.id = 0
158         if self.last_log_id != None:
159             self.id = self.last_log_id
160         else:
161             for f in self.filters:
162                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
163                     try:
164                         items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
165                         break
166                     except errors.ApiError as error:
167                         pass
168                     else:
169                         tries_left = 0
170                         break
171                 if tries_left == 0:
172                     _logger.exception("PollClient thread could not contact API server.")
173                     with self._closing_lock:
174                         self._closing.set()
175                     thread.interrupt_main()
176                     return
177                 if items:
178                     if items[0]['id'] > self.id:
179                         self.id = items[0]['id']
180
181         self.on_event({'status': 200})
182
183         while not self._closing.is_set():
184             max_id = self.id
185             moreitems = False
186             for f in self.filters:
187                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
188                     try:
189                         items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
190                         break
191                     except errors.ApiError as error:
192                         pass
193                     else:
194                         tries_left = 0
195                         break
196                 if tries_left == 0:
197                     _logger.exception("PollClient thread could not contact API server.")
198                     with self._closing_lock:
199                         self._closing.set()
200                     thread.interrupt_main()
201                     return
202                 for i in items["items"]:
203                     if i['id'] > max_id:
204                         max_id = i['id']
205                     with self._closing_lock:
206                         if self._closing.is_set():
207                             return
208                         try:
209                             self.on_event(i)
210                         except Exception as e:
211                             _logger.exception("Unexpected exception from event callback.")
212                             thread.interrupt_main()
213                 if items["items_available"] > len(items["items"]):
214                     moreitems = True
215             self.id = max_id
216             if not moreitems:
217                 self._closing.wait(self.poll_time)
218
219     def run_forever(self):
220         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
221         while not self._closing.is_set():
222             self._closing.wait(1)
223
224     def close(self, code=None, reason=None, timeout=0):
225         """Close poll client and optionally wait for it to finish.
226
227         If an :on_event: handler is running in a different thread,
228         first wait (indefinitely) for it to return.
229
230         After closing, wait up to :timeout: seconds for the thread to
231         finish the poll request in progress (if any).
232
233         :code: and :reason: are ignored. They are present for
234         interface compatibility with EventClient.
235         """
236
237         with self._closing_lock:
238             self._closing.set()
239         try:
240             self.join(timeout=timeout)
241         except RuntimeError:
242             # "join() raises a RuntimeError if an attempt is made to join the
243             # current thread as that would cause a deadlock. It is also an
244             # error to join() a thread before it has been started and attempts
245             # to do so raises the same exception."
246             pass
247
248     def subscribe(self, f):
249         self.on_event({'status': 200})
250         self.filters.append(f)
251
252     def unsubscribe(self, f):
253         del self.filters[self.filters.index(f)]
254
255
256 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
257     endpoint = api._rootDesc.get('websocketUrl', None)
258     if not endpoint:
259         raise errors.FeatureNotEnabledError(
260             "Server does not advertise a websocket endpoint")
261     uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
262     try:
263         client = EventClient(uri_with_token, filters, on_event, last_log_id)
264     except Exception:
265         _logger.warn("Failed to connect to websockets on %s" % endpoint)
266         raise
267     else:
268         return client
269
270
271 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
272     """
273     :api:
274       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
275     :filters:
276       Initial subscription filters.
277     :on_event:
278       The callback when a message is received.
279     :poll_fallback:
280       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
281     :last_log_id:
282       Log rows that are newer than the log id
283     """
284
285     if not poll_fallback:
286         return _subscribe_websocket(api, filters, on_event, last_log_id)
287
288     try:
289         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
290             return _subscribe_websocket(api, filters, on_event, last_log_id)
291         else:
292             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
293     except Exception as e:
294         _logger.warn("Falling back to polling after websocket error: %s" % e)
295     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
296     p.start()
297     return p