5737: Fix test so call counts are as expected
[arvados.git] / sdk / python / arvados / events.py
1 import arvados
2 import config
3 import errors
4 from retry import RetryLoop
5
6 import logging
7 import json
8 import thread
9 import threading
10 import time
11 import os
12 import re
13 import ssl
14 from ws4py.client.threadedclient import WebSocketClient
15
16 _logger = logging.getLogger('arvados.events')
17
18
19 class _EventClient(WebSocketClient):
20     def __init__(self, url, filters, on_event, last_log_id, on_closed):
21         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
22         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
23             ssl_options['cert_reqs'] = ssl.CERT_NONE
24         else:
25             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
26
27         # Warning: If the host part of url resolves to both IPv6 and
28         # IPv4 addresses (common with "localhost"), only one of them
29         # will be attempted -- and it might not be the right one. See
30         # ws4py's WebSocketBaseClient.__init__.
31         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
32
33         self.filters = filters
34         self.on_event = on_event
35         self.last_log_id = last_log_id
36         self._closing_lock = threading.RLock()
37         self._closing = False
38         self._closed = threading.Event()
39         self.on_closed = on_closed
40
41     def opened(self):
42         for f in self.filters:
43             self.subscribe(f, self.last_log_id)
44
45     def closed(self, code, reason=None):
46         self._closed.set()
47         self.on_closed()
48
49     def received_message(self, m):
50         with self._closing_lock:
51             if not self._closing:
52                 self.on_event(json.loads(str(m)))
53
54     def close(self, code=1000, reason='', timeout=0):
55         """Close event client and optionally wait for it to finish.
56
57         :timeout: is the number of seconds to wait for ws4py to
58         indicate that the connection has closed.
59         """
60         super(_EventClient, self).close(code, reason)
61         with self._closing_lock:
62             # make sure we don't process any more messages.
63             self._closing = True
64         # wait for ws4py to tell us the connection is closed.
65         self._closed.wait(timeout=timeout)
66
67     def subscribe(self, f, last_log_id=None):
68         m = {"method": "subscribe", "filters": f}
69         if last_log_id is not None:
70             m["last_log_id"] = last_log_id
71         self.send(json.dumps(m))
72
73     def unsubscribe(self, f):
74         self.send(json.dumps({"method": "unsubscribe", "filters": f}))
75
76
77 class EventClient(object):
78     def __init__(self, url, filters, on_event_cb, last_log_id):
79         self.url = url
80         if filters:
81             self.filters = [filters]
82         else:
83             self.filters = [[]]
84         self.on_event_cb = on_event_cb
85         self.last_log_id = last_log_id
86         self.is_closed = threading.Event()
87         self._setup_event_client()
88
89     def _setup_event_client(self):
90         self.ec = _EventClient(self.url, self.filters, self.on_event,
91                                self.last_log_id, self.on_closed)
92         self.ec.daemon = True
93         try:
94             self.ec.connect()
95         except Exception:
96             self.ec.close_connection()
97             raise
98
99     def subscribe(self, f, last_log_id=None):
100         self.filters.append(f)
101         self.ec.subscribe(f, last_log_id)
102
103     def unsubscribe(self, f):
104         del self.filters[self.filters.index(f)]
105         self.ec.unsubscribe(f)
106
107     def close(self, code=1000, reason='', timeout=0):
108         self.is_closed.set()
109         self.ec.close(code, reason, timeout)
110
111     def on_event(self, m):
112         if m.get('id') != None:
113             self.last_log_id = m.get('id')
114         try:
115             self.on_event_cb(m)
116         except Exception as e:
117             _logger.exception("Unexpected exception from event callback.")
118             thread.interrupt_main()
119
120     def on_closed(self):
121         if not self.is_closed.is_set():
122             _logger.warn("Unexpected close. Reconnecting.")
123             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
124                 try:
125                     self._setup_event_client()
126                     _logger.warn("Reconnect successful.")
127                     break
128                 except Exception as e:
129                     _logger.warn("Error '%s' during websocket reconnect.", e)
130             if tries_left == 0:
131                 _logger.exception("EventClient thread could not contact websocket server.")
132                 self.is_closed.set()
133                 thread.interrupt_main()
134                 return
135
136     def run_forever(self):
137         # Have to poll here to let KeyboardInterrupt get raised.
138         while not self.is_closed.wait(1):
139             pass
140
141
142 class PollClient(threading.Thread):
143     def __init__(self, api, filters, on_event, poll_time, last_log_id):
144         super(PollClient, self).__init__()
145         self.api = api
146         if filters:
147             self.filters = [filters]
148         else:
149             self.filters = [[]]
150         self.on_event = on_event
151         self.poll_time = poll_time
152         self.daemon = True
153         self.last_log_id = last_log_id
154         self._closing = threading.Event()
155         self._closing_lock = threading.RLock()
156
157     def run(self):
158         self.id = 0
159         if self.last_log_id != None:
160             self.id = self.last_log_id
161         else:
162             for f in self.filters:
163                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
164                     try:
165                         items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
166                         break
167                     except errors.ApiError as error:
168                         pass
169                     else:
170                         tries_left = 0
171                         break
172                 if tries_left == 0:
173                     _logger.exception("PollClient thread could not contact API server.")
174                     with self._closing_lock:
175                         self._closing.set()
176                     thread.interrupt_main()
177                     return
178                 if items:
179                     if items[0]['id'] > self.id:
180                         self.id = items[0]['id']
181
182         self.on_event({'status': 200})
183
184         while not self._closing.is_set():
185             max_id = self.id
186             moreitems = False
187             for f in self.filters:
188                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
189                     try:
190                         items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
191                         break
192                     except errors.ApiError as error:
193                         pass
194                     else:
195                         tries_left = 0
196                         break
197                 if tries_left == 0:
198                     _logger.exception("PollClient thread could not contact API server.")
199                     with self._closing_lock:
200                         self._closing.set()
201                     thread.interrupt_main()
202                     return
203                 for i in items["items"]:
204                     if i['id'] > max_id:
205                         max_id = i['id']
206                     with self._closing_lock:
207                         if self._closing.is_set():
208                             return
209                         try:
210                             self.on_event(i)
211                         except Exception as e:
212                             _logger.exception("Unexpected exception from event callback.")
213                             thread.interrupt_main()
214                 if items["items_available"] > len(items["items"]):
215                     moreitems = True
216             self.id = max_id
217             if not moreitems:
218                 self._closing.wait(self.poll_time)
219
220     def run_forever(self):
221         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
222         while not self._closing.is_set():
223             self._closing.wait(1)
224
225     def close(self, code=None, reason=None, timeout=0):
226         """Close poll client and optionally wait for it to finish.
227
228         If an :on_event: handler is running in a different thread,
229         first wait (indefinitely) for it to return.
230
231         After closing, wait up to :timeout: seconds for the thread to
232         finish the poll request in progress (if any).
233
234         :code: and :reason: are ignored. They are present for
235         interface compatibility with EventClient.
236         """
237
238         with self._closing_lock:
239             self._closing.set()
240         try:
241             self.join(timeout=timeout)
242         except RuntimeError:
243             # "join() raises a RuntimeError if an attempt is made to join the
244             # current thread as that would cause a deadlock. It is also an
245             # error to join() a thread before it has been started and attempts
246             # to do so raises the same exception."
247             pass
248
249     def subscribe(self, f):
250         self.on_event({'status': 200})
251         self.filters.append(f)
252
253     def unsubscribe(self, f):
254         del self.filters[self.filters.index(f)]
255
256
257 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
258     endpoint = api._rootDesc.get('websocketUrl', None)
259     if not endpoint:
260         raise errors.FeatureNotEnabledError(
261             "Server does not advertise a websocket endpoint")
262     uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
263     try:
264         client = EventClient(uri_with_token, filters, on_event, last_log_id)
265     except Exception:
266         _logger.warn("Failed to connect to websockets on %s" % endpoint)
267         raise
268     else:
269         return client
270
271
272 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
273     """
274     :api:
275       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
276     :filters:
277       Initial subscription filters.
278     :on_event:
279       The callback when a message is received.
280     :poll_fallback:
281       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
282     :last_log_id:
283       Log rows that are newer than the log id
284     """
285
286     if not poll_fallback:
287         return _subscribe_websocket(api, filters, on_event, last_log_id)
288
289     try:
290         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
291             return _subscribe_websocket(api, filters, on_event, last_log_id)
292         else:
293             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
294     except Exception as e:
295         _logger.warn("Falling back to polling after websocket error: %s" % e)
296     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
297     p.start()
298     return p