Merge branch '8465-cwl-containers-stdin-stderr' closes #8465
[arvados.git] / sdk / python / arvados / events.py
1 import arvados
2 import config
3 import errors
4 from retry import RetryLoop
5
6 import logging
7 import json
8 import thread
9 import threading
10 import time
11 import os
12 import re
13 import ssl
14 from ws4py.client.threadedclient import WebSocketClient
15
16 _logger = logging.getLogger('arvados.events')
17
18
19 class _EventClient(WebSocketClient):
20     def __init__(self, url, filters, on_event, last_log_id, on_closed):
21         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
22         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
23             ssl_options['cert_reqs'] = ssl.CERT_NONE
24         else:
25             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
26
27         # Warning: If the host part of url resolves to both IPv6 and
28         # IPv4 addresses (common with "localhost"), only one of them
29         # will be attempted -- and it might not be the right one. See
30         # ws4py's WebSocketBaseClient.__init__.
31         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
32
33         self.filters = filters
34         self.on_event = on_event
35         self.last_log_id = last_log_id
36         self._closing_lock = threading.RLock()
37         self._closing = False
38         self._closed = threading.Event()
39         self.on_closed = on_closed
40
41     def opened(self):
42         for f in self.filters:
43             self.subscribe(f, self.last_log_id)
44
45     def closed(self, code, reason=None):
46         self._closed.set()
47         self.on_closed()
48
49     def received_message(self, m):
50         with self._closing_lock:
51             if not self._closing:
52                 self.on_event(json.loads(str(m)))
53
54     def close(self, code=1000, reason='', timeout=0):
55         """Close event client and optionally wait for it to finish.
56
57         :timeout: is the number of seconds to wait for ws4py to
58         indicate that the connection has closed.
59         """
60         super(_EventClient, self).close(code, reason)
61         with self._closing_lock:
62             # make sure we don't process any more messages.
63             self._closing = True
64         # wait for ws4py to tell us the connection is closed.
65         self._closed.wait(timeout=timeout)
66
67     def subscribe(self, f, last_log_id=None):
68         m = {"method": "subscribe", "filters": f}
69         if last_log_id is not None:
70             m["last_log_id"] = last_log_id
71         self.send(json.dumps(m))
72
73     def unsubscribe(self, f):
74         self.send(json.dumps({"method": "unsubscribe", "filters": f}))
75
76
77 class EventClient(object):
78     def __init__(self, url, filters, on_event_cb, last_log_id):
79         self.url = url
80         if filters:
81             self.filters = [filters]
82         else:
83             self.filters = [[]]
84         self.on_event_cb = on_event_cb
85         self.last_log_id = last_log_id
86         self.is_closed = threading.Event()
87         self._setup_event_client()
88
89     def _setup_event_client(self):
90         self.ec = _EventClient(self.url, self.filters, self.on_event,
91                                self.last_log_id, self.on_closed)
92         self.ec.daemon = True
93         try:
94             self.ec.connect()
95         except Exception:
96             self.ec.close_connection()
97             raise
98
99     def subscribe(self, f, last_log_id=None):
100         self.filters.append(f)
101         self.ec.subscribe(f, last_log_id)
102
103     def unsubscribe(self, f):
104         del self.filters[self.filters.index(f)]
105         self.ec.unsubscribe(f)
106
107     def close(self, code=1000, reason='', timeout=0):
108         self.is_closed.set()
109         self.ec.close(code, reason, timeout)
110
111     def on_event(self, m):
112         if m.get('id') != None:
113             self.last_log_id = m.get('id')
114         try:
115             self.on_event_cb(m)
116         except Exception as e:
117             _logger.exception("Unexpected exception from event callback.")
118             thread.interrupt_main()
119
120     def on_closed(self):
121         if not self.is_closed.is_set():
122             _logger.warn("Unexpected close. Reconnecting.")
123             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
124                 try:
125                     self._setup_event_client()
126                     _logger.warn("Reconnect successful.")
127                     break
128                 except Exception as e:
129                     _logger.warn("Error '%s' during websocket reconnect.", e)
130             if tries_left == 0:
131                 _logger.exception("EventClient thread could not contact websocket server.")
132                 self.is_closed.set()
133                 thread.interrupt_main()
134                 return
135
136     def run_forever(self):
137         # Have to poll here to let KeyboardInterrupt get raised.
138         while not self.is_closed.wait(1):
139             pass
140
141
142 class PollClient(threading.Thread):
143     def __init__(self, api, filters, on_event, poll_time, last_log_id):
144         super(PollClient, self).__init__()
145         self.api = api
146         if filters:
147             self.filters = [filters]
148         else:
149             self.filters = [[]]
150         self.on_event = on_event
151         self.poll_time = poll_time
152         self.daemon = True
153         self.last_log_id = last_log_id
154         self._closing = threading.Event()
155         self._closing_lock = threading.RLock()
156
157     def run(self):
158         if self.last_log_id != None:
159             # Caller supplied the last-seen event ID from a previous
160             # connection
161             skip_old_events = [["id", ">", str(self.last_log_id)]]
162         else:
163             # We need to do a reverse-order query to find the most
164             # recent event ID (see "if not skip_old_events" below).
165             skip_old_events = False
166
167         self.on_event({'status': 200})
168
169         while not self._closing.is_set():
170             moreitems = False
171             for f in self.filters:
172                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
173                     try:
174                         if not skip_old_events:
175                             # If the caller didn't provide a known
176                             # recent ID, our first request will ask
177                             # for the single most recent event from
178                             # the last 2 hours (the time restriction
179                             # avoids doing an expensive database
180                             # query, and leaves a big enough margin to
181                             # account for clock skew). If we do find a
182                             # recent event, we remember its ID but
183                             # then discard it (we are supposed to be
184                             # returning new/current events, not old
185                             # ones).
186                             #
187                             # Subsequent requests will get multiple
188                             # events in chronological order, and
189                             # filter on that same cutoff time, or
190                             # (once we see our first matching event)
191                             # the ID of the last-seen event.
192                             skip_old_events = [[
193                                 "created_at", ">=",
194                                 time.strftime(
195                                     "%Y-%m-%dT%H:%M:%SZ",
196                                     time.gmtime(time.time()-7200))]]
197                             items = self.api.logs().list(
198                                 order="id desc",
199                                 limit=1,
200                                 filters=f+skip_old_events).execute()
201                             if items["items"]:
202                                 skip_old_events = [
203                                     ["id", ">", str(items["items"][0]["id"])]]
204                                 items = {
205                                     "items": [],
206                                     "items_available": 0,
207                                 }
208                         else:
209                             # In this case, either we know the most
210                             # recent matching ID, or we know there
211                             # were no matching events in the 2-hour
212                             # window before subscribing. Either way we
213                             # can safely ask for events in ascending
214                             # order.
215                             items = self.api.logs().list(
216                                 order="id asc",
217                                 filters=f+skip_old_events).execute()
218                         break
219                     except errors.ApiError as error:
220                         pass
221                     else:
222                         tries_left = 0
223                         break
224                 if tries_left == 0:
225                     _logger.exception("PollClient thread could not contact API server.")
226                     with self._closing_lock:
227                         self._closing.set()
228                     thread.interrupt_main()
229                     return
230                 for i in items["items"]:
231                     skip_old_events = [["id", ">", str(i["id"])]]
232                     with self._closing_lock:
233                         if self._closing.is_set():
234                             return
235                         try:
236                             self.on_event(i)
237                         except Exception as e:
238                             _logger.exception("Unexpected exception from event callback.")
239                             thread.interrupt_main()
240                 if items["items_available"] > len(items["items"]):
241                     moreitems = True
242             if not moreitems:
243                 self._closing.wait(self.poll_time)
244
245     def run_forever(self):
246         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
247         while not self._closing.is_set():
248             self._closing.wait(1)
249
250     def close(self, code=None, reason=None, timeout=0):
251         """Close poll client and optionally wait for it to finish.
252
253         If an :on_event: handler is running in a different thread,
254         first wait (indefinitely) for it to return.
255
256         After closing, wait up to :timeout: seconds for the thread to
257         finish the poll request in progress (if any).
258
259         :code: and :reason: are ignored. They are present for
260         interface compatibility with EventClient.
261         """
262
263         with self._closing_lock:
264             self._closing.set()
265         try:
266             self.join(timeout=timeout)
267         except RuntimeError:
268             # "join() raises a RuntimeError if an attempt is made to join the
269             # current thread as that would cause a deadlock. It is also an
270             # error to join() a thread before it has been started and attempts
271             # to do so raises the same exception."
272             pass
273
274     def subscribe(self, f):
275         self.on_event({'status': 200})
276         self.filters.append(f)
277
278     def unsubscribe(self, f):
279         del self.filters[self.filters.index(f)]
280
281
282 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
283     endpoint = api._rootDesc.get('websocketUrl', None)
284     if not endpoint:
285         raise errors.FeatureNotEnabledError(
286             "Server does not advertise a websocket endpoint")
287     uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
288     try:
289         client = EventClient(uri_with_token, filters, on_event, last_log_id)
290     except Exception:
291         _logger.warn("Failed to connect to websockets on %s" % endpoint)
292         raise
293     else:
294         return client
295
296
297 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
298     """
299     :api:
300       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
301     :filters:
302       Initial subscription filters.
303     :on_event:
304       The callback when a message is received.
305     :poll_fallback:
306       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
307     :last_log_id:
308       Log rows that are newer than the log id
309     """
310
311     if not poll_fallback:
312         return _subscribe_websocket(api, filters, on_event, last_log_id)
313
314     try:
315         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
316             return _subscribe_websocket(api, filters, on_event, last_log_id)
317         else:
318             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
319     except Exception as e:
320         _logger.warn("Falling back to polling after websocket error: %s" % e)
321     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
322     p.start()
323     return p