54f3019f36b7b3e5544e3780afb730d61c44d8fd
[arvados.git] / sdk / python / arvados / events.py
1 import arvados
2 import config
3 import errors
4 from retry import RetryLoop
5
6 import logging
7 import json
8 import thread
9 import threading
10 import time
11 import os
12 import re
13 import ssl
14 from ws4py.client.threadedclient import WebSocketClient
15
16 _logger = logging.getLogger('arvados.events')
17
18
19 class _EventClient(WebSocketClient):
20     def __init__(self, url, filters, on_event, last_log_id, on_closed):
21         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
22         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
23             ssl_options['cert_reqs'] = ssl.CERT_NONE
24         else:
25             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
26
27         # Warning: If the host part of url resolves to both IPv6 and
28         # IPv4 addresses (common with "localhost"), only one of them
29         # will be attempted -- and it might not be the right one. See
30         # ws4py's WebSocketBaseClient.__init__.
31         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
32
33         self.filters = filters
34         self.on_event = on_event
35         self.last_log_id = last_log_id
36         self._closing_lock = threading.RLock()
37         self._closing = False
38         self._closed = threading.Event()
39         self.on_closed = on_closed
40
41     def opened(self):
42         for f in self.filters:
43             self.subscribe(f, self.last_log_id)
44
45     def closed(self, code, reason=None):
46         self._closed.set()
47         self.on_closed()
48
49     def received_message(self, m):
50         with self._closing_lock:
51             if not self._closing:
52                 self.on_event(json.loads(str(m)))
53
54     def close(self, code=1000, reason='', timeout=0):
55         """Close event client and optionally wait for it to finish.
56
57         :timeout: is the number of seconds to wait for ws4py to
58         indicate that the connection has closed.
59         """
60         super(_EventClient, self).close(code, reason)
61         with self._closing_lock:
62             # make sure we don't process any more messages.
63             self._closing = True
64         # wait for ws4py to tell us the connection is closed.
65         self._closed.wait(timeout=timeout)
66
67     def subscribe(self, f, last_log_id=None):
68         m = {"method": "subscribe", "filters": f}
69         if last_log_id is not None:
70             m["last_log_id"] = last_log_id
71         self.send(json.dumps(m))
72
73     def unsubscribe(self, f):
74         self.send(json.dumps({"method": "unsubscribe", "filters": f}))
75
76
77 class EventClient(object):
78     def __init__(self, url, filters, on_event_cb, last_log_id):
79         self.url = url
80         if filters:
81             self.filters = [filters]
82         else:
83             self.filters = [[]]
84         self.on_event_cb = on_event_cb
85         self.last_log_id = last_log_id
86         self.is_closed = False
87         self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
88
89     def connect(self):
90         self.ec.connect()
91
92     def close_connection(self):
93         self.ec.close_connection()
94
95     def subscribe(self, f, last_log_id=None):
96         self.filters.append(f)
97         self.ec.subscribe(f, last_log_id)
98
99     def unsubscribe(self, f):
100         del self.filters[self.filters.index(f)]
101         self.ec.unsubscribe(f)
102
103     def close(self, code=1000, reason='', timeout=0):
104         self.is_closed = True
105         self.ec.close(code, reason, timeout)
106
107     def on_event(self, m):
108         if m.get('id') != None:
109             self.last_log_id = m.get('id')
110         try:
111             self.on_event_cb(m)
112         except Exception as e:
113             _logger.exception("Unexpected exception from event callback.")
114             thread.interrupt_main()
115
116     def on_closed(self):
117         if self.is_closed == False:
118             _logger.warn("Unexpected close. Reconnecting.")
119             self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
120             while True:
121               try:
122                   self.ec.connect()
123                   break
124               except Exception as e:
125                   _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
126                   time.sleep(5)
127
128
129 class PollClient(threading.Thread):
130     def __init__(self, api, filters, on_event, poll_time, last_log_id):
131         super(PollClient, self).__init__()
132         self.api = api
133         if filters:
134             self.filters = [filters]
135         else:
136             self.filters = [[]]
137         self.on_event = on_event
138         self.poll_time = poll_time
139         self.daemon = True
140         self.last_log_id = last_log_id
141         self._closing = threading.Event()
142         self._closing_lock = threading.RLock()
143
144     def run(self):
145         self.id = 0
146         if self.last_log_id != None:
147             self.id = self.last_log_id
148         else:
149             for f in self.filters:
150                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
151                     try:
152                         items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
153                         break
154                     except errors.ApiError as error:
155                         pass
156                     else:
157                         tries_left = 0
158                         break
159                 if tries_left == 0:
160                     _logger.exception("PollClient thread could not contact API server.")
161                     with self._closing_lock:
162                         self._closing.set()
163                     thread.interrupt_main()
164                     return
165                 if items:
166                     if items[0]['id'] > self.id:
167                         self.id = items[0]['id']
168
169         self.on_event({'status': 200})
170
171         while not self._closing.is_set():
172             max_id = self.id
173             moreitems = False
174             for f in self.filters:
175                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
176                     try:
177                         items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
178                         break
179                     except errors.ApiError as error:
180                         pass
181                 if tries_left == 0:
182                     _logger.exception("PollClient thread could not contact API server.")
183                     with self._closing_lock:
184                         self._closing.set()
185                     thread.interrupt_main()
186                     return
187                 for i in items["items"]:
188                     if i['id'] > max_id:
189                         max_id = i['id']
190                     with self._closing_lock:
191                         if self._closing.is_set():
192                             return
193                         try:
194                             self.on_event(i)
195                         except Exception as e:
196                             _logger.exception("Unexpected exception from event callback.")
197                             thread.interrupt_main()
198                 if items["items_available"] > len(items["items"]):
199                     moreitems = True
200             self.id = max_id
201             if not moreitems:
202                 self._closing.wait(self.poll_time)
203
204     def run_forever(self):
205         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
206         while not self._closing.is_set():
207             self._closing.wait(1)
208
209     def close(self, code=None, reason=None, timeout=0):
210         """Close poll client and optionally wait for it to finish.
211
212         If an :on_event: handler is running in a different thread,
213         first wait (indefinitely) for it to return.
214
215         After closing, wait up to :timeout: seconds for the thread to
216         finish the poll request in progress (if any).
217
218         :code: and :reason: are ignored. They are present for
219         interface compatibility with EventClient.
220         """
221
222         with self._closing_lock:
223             self._closing.set()
224         try:
225             self.join(timeout=timeout)
226         except RuntimeError:
227             # "join() raises a RuntimeError if an attempt is made to join the
228             # current thread as that would cause a deadlock. It is also an
229             # error to join() a thread before it has been started and attempts
230             # to do so raises the same exception."
231             pass
232
233     def subscribe(self, f):
234         self.on_event({'status': 200})
235         self.filters.append(f)
236
237     def unsubscribe(self, f):
238         del self.filters[self.filters.index(f)]
239
240
241 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
242     endpoint = api._rootDesc.get('websocketUrl', None)
243     if not endpoint:
244         raise errors.FeatureNotEnabledError(
245             "Server does not advertise a websocket endpoint")
246     try:
247         uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
248         client = EventClient(uri_with_token, filters, on_event, last_log_id)
249         ok = False
250         try:
251             client.connect()
252             ok = True
253             return client
254         finally:
255             if not ok:
256                 client.close_connection()
257     except:
258         _logger.warn("Failed to connect to websockets on %s" % endpoint)
259         raise
260
261
262 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
263     """
264     :api:
265       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
266     :filters:
267       Initial subscription filters.
268     :on_event:
269       The callback when a message is received.
270     :poll_fallback:
271       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
272     :last_log_id:
273       Log rows that are newer than the log id
274     """
275
276     if not poll_fallback:
277         return _subscribe_websocket(api, filters, on_event, last_log_id)
278
279     try:
280         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
281             return _subscribe_websocket(api, filters, on_event, last_log_id)
282         else:
283             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
284     except Exception as e:
285         _logger.warn("Falling back to polling after websocket error: %s" % e)
286     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
287     p.start()
288     return p