8724: test updates
[arvados.git] / sdk / python / arvados / events.py
1 import arvados
2 import config
3 import errors
4
5 import logging
6 import json
7 import threading
8 import time
9 import os
10 import re
11 import ssl
12 from ws4py.client.threadedclient import WebSocketClient
13
14 _logger = logging.getLogger('arvados.events')
15
16
17 class EventClient(WebSocketClient):
18     def __init__(self, url, filters, on_event, last_log_id):
19         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
20         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
21             ssl_options['cert_reqs'] = ssl.CERT_NONE
22         else:
23             ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
24
25         # Warning: If the host part of url resolves to both IPv6 and
26         # IPv4 addresses (common with "localhost"), only one of them
27         # will be attempted -- and it might not be the right one. See
28         # ws4py's WebSocketBaseClient.__init__.
29         super(EventClient, self).__init__(url, ssl_options=ssl_options)
30         self.filters = filters
31         self.on_event = on_event
32         self.last_log_id = last_log_id
33         self._closing_lock = threading.RLock()
34         self._closing = False
35         self._closed = threading.Event()
36
37     def opened(self):
38         self.subscribe(self.filters, self.last_log_id)
39
40     def closed(self, code, reason=None):
41         self._closed.set()
42
43     def received_message(self, m):
44         with self._closing_lock:
45             if not self._closing:
46                 self.on_event(json.loads(str(m)))
47
48     def close(self, code=1000, reason='', timeout=0):
49         """Close event client and optionally wait for it to finish.
50
51         :timeout: is the number of seconds to wait for ws4py to
52         indicate that the connection has closed.
53         """
54         super(EventClient, self).close(code, reason)
55         with self._closing_lock:
56             # make sure we don't process any more messages.
57             self._closing = True
58         # wait for ws4py to tell us the connection is closed.
59         self._closed.wait(timeout=timeout)
60
61     def subscribe(self, filters, last_log_id=None):
62         m = {"method": "subscribe", "filters": filters}
63         if last_log_id is not None:
64             m["last_log_id"] = last_log_id
65         self.send(json.dumps(m))
66
67     def unsubscribe(self, filters):
68         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
69
70
71 class PollClient(threading.Thread):
72     def __init__(self, api, filters, on_event, poll_time, last_log_id):
73         super(PollClient, self).__init__()
74         self.api = api
75         if filters:
76             self.filters = [filters]
77         else:
78             self.filters = [[]]
79         self.on_event = on_event
80         self.poll_time = poll_time
81         self.daemon = True
82         self.last_log_id = last_log_id
83         self._closing = threading.Event()
84         self._closing_lock = threading.RLock()
85
86     def run(self):
87         self.id = 0
88         if self.last_log_id != None:
89             self.id = self.last_log_id
90         else:
91             for f in self.filters:
92                 items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
93                 if items:
94                     if items[0]['id'] > self.id:
95                         self.id = items[0]['id']
96
97         self.on_event({'status': 200})
98
99         while not self._closing.is_set():
100             max_id = self.id
101             moreitems = False
102             for f in self.filters:
103                 items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
104                 for i in items["items"]:
105                     if i['id'] > max_id:
106                         max_id = i['id']
107                     with self._closing_lock:
108                         if self._closing.is_set():
109                             return
110                         self.on_event(i)
111                 if items["items_available"] > len(items["items"]):
112                     moreitems = True
113             self.id = max_id
114             if not moreitems:
115                 self._closing.wait(self.poll_time)
116
117     def run_forever(self):
118         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
119         while not self._closing.is_set():
120             self._closing.wait(1)
121
122     def close(self, code=None, reason=None, timeout=0):
123         """Close poll client and optionally wait for it to finish.
124
125         If an :on_event: handler is running in a different thread,
126         first wait (indefinitely) for it to return.
127
128         After closing, wait up to :timeout: seconds for the thread to
129         finish the poll request in progress (if any).
130
131         :code: and :reason: are ignored. They are present for
132         interface compatibility with EventClient.
133         """
134
135         with self._closing_lock:
136             self._closing.set()
137         try:
138             self.join(timeout=timeout)
139         except RuntimeError:
140             # "join() raises a RuntimeError if an attempt is made to join the
141             # current thread as that would cause a deadlock. It is also an
142             # error to join() a thread before it has been started and attempts
143             # to do so raises the same exception."
144             pass
145
146     def subscribe(self, filters):
147         self.on_event({'status': 200})
148         self.filters.append(filters)
149
150     def unsubscribe(self, filters):
151         del self.filters[self.filters.index(filters)]
152
153
154 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
155     endpoint = api._rootDesc.get('websocketUrl', None)
156     if not endpoint:
157         raise errors.FeatureNotEnabledError(
158             "Server does not advertise a websocket endpoint")
159     try:
160         uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
161         client = EventClient(uri_with_token, filters, on_event, last_log_id)
162         ok = False
163         try:
164             client.connect()
165             ok = True
166             return client
167         finally:
168             if not ok:
169                 client.close_connection()
170     except:
171         _logger.warn("Failed to connect to websockets on %s" % endpoint)
172         raise
173
174
175 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
176     """
177     :api:
178       a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
179     :filters:
180       Initial subscription filters.
181     :on_event:
182       The callback when a message is received.
183     :poll_fallback:
184       If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
185     :last_log_id:
186       Log rows that are newer than the log id
187     """
188
189     if not poll_fallback:
190         return _subscribe_websocket(api, filters, on_event, last_log_id)
191
192     try:
193         if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
194             return _subscribe_websocket(api, filters, on_event, last_log_id)
195         else:
196             _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
197     except Exception as e:
198         _logger.warn("Falling back to polling after websocket error: %s" % e)
199     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
200     p.start()
201     return p