8460: Support created_at filters.
[arvados.git] / sdk / python / tests / test_events.py
1 import arvados
2 import io
3 import logging
4 import mock
5 import Queue
6 import run_test_server
7 import threading
8 import time
9 import unittest
10
11 import arvados_testutil
12
13 class WebsocketTest(run_test_server.TestCaseWithServers):
14     MAIN_SERVER = {}
15
16     TIME_PAST = time.time()-3600
17     TIME_FUTURE = time.time()+3600
18     MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
19
20     def setUp(self):
21         self.ws = None
22
23     def tearDown(self):
24         try:
25             if self.ws:
26                 self.ws.close()
27         except Exception as e:
28             print("Error in teardown: ", e)
29         super(WebsocketTest, self).tearDown()
30         run_test_server.reset()
31
32     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
33         run_test_server.authorize_with('active')
34         events = Queue.Queue(100)
35
36         # Create ancestor before subscribing.
37         # When listening with start_time in the past, this should also be retrieved.
38         # However, when start_time is omitted in subscribe, this should not be fetched.
39         ancestor = arvados.api('v1').humans().create(body={}).execute()
40
41         filters = [['object_uuid', 'is_a', 'arvados#human']]
42         if start_time:
43             filters.append(['created_at', '>=', start_time])
44
45         self.ws = arvados.events.subscribe(
46             arvados.api('v1'), filters,
47             events.put_nowait,
48             poll_fallback=poll_fallback,
49             last_log_id=(1 if start_time else None))
50         self.assertIsInstance(self.ws, expect_type)
51         self.assertEqual(200, events.get(True, 5)['status'])
52         human = arvados.api('v1').humans().create(body={}).execute()
53
54         log_object_uuids = []
55         for i in range(0, expected):
56             log_object_uuids.append(events.get(True, 5)['object_uuid'])
57
58         if expected > 0:
59             self.assertIn(human['uuid'], log_object_uuids)
60
61         if expected > 1:
62             self.assertIn(ancestor['uuid'], log_object_uuids)
63
64         with self.assertRaises(Queue.Empty):
65             # assertEqual just serves to show us what unexpected thing
66             # comes out of the queue when the assertRaises fails; when
67             # the test passes, this assertEqual doesn't get called.
68             self.assertEqual(events.get(True, 2), None)
69
70     def test_subscribe_websocket(self):
71         self._test_subscribe(
72             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
73
74     @mock.patch('arvados.events.EventClient.__init__')
75     def test_subscribe_poll(self, event_client_constr):
76         event_client_constr.side_effect = Exception('All is well')
77         self._test_subscribe(
78             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
79
80     def test_subscribe_poll_retry(self):
81         api_mock = mock.MagicMock()
82         n = []
83         def on_ev(ev):
84             n.append(ev)
85
86         error_mock = mock.MagicMock()
87         error_mock.resp.status = 0
88         error_mock._get_reason.return_value = "testing"
89         api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
90                                                       {"items": [{"id": 1}], "items_available": 1},
91                                                       arvados.errors.ApiError(error_mock, ""),
92                                                       {"items": [{"id": 1}], "items_available": 1})
93         pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
94         pc.start()
95         while len(n) < 2:
96             time.sleep(.1)
97         pc.close()
98
99     def test_subscribe_websocket_with_start_time_past(self):
100         self._test_subscribe(
101             poll_fallback=False, expect_type=arvados.events.EventClient,
102             start_time=self.localiso(self.TIME_PAST),
103             expected=2)
104
105     @mock.patch('arvados.events.EventClient.__init__')
106     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
107         event_client_constr.side_effect = Exception('All is well')
108         self._test_subscribe(
109             poll_fallback=0.25, expect_type=arvados.events.PollClient,
110             start_time=self.localiso(self.TIME_PAST),
111             expected=2)
112
113     def test_subscribe_websocket_with_start_time_future(self):
114         self._test_subscribe(
115             poll_fallback=False, expect_type=arvados.events.EventClient,
116             start_time=self.localiso(self.TIME_FUTURE),
117             expected=0)
118
119     @mock.patch('arvados.events.EventClient.__init__')
120     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
121         event_client_constr.side_effect = Exception('All is well')
122         self._test_subscribe(
123             poll_fallback=0.25, expect_type=arvados.events.PollClient,
124             start_time=self.localiso(self.TIME_FUTURE),
125             expected=0)
126
127     def test_subscribe_websocket_with_start_time_past_utc(self):
128         self._test_subscribe(
129             poll_fallback=False, expect_type=arvados.events.EventClient,
130             start_time=self.utciso(self.TIME_PAST),
131             expected=2)
132
133     def test_subscribe_websocket_with_start_time_future_utc(self):
134         self._test_subscribe(
135             poll_fallback=False, expect_type=arvados.events.EventClient,
136             start_time=self.utciso(self.TIME_FUTURE),
137             expected=0)
138
139     def utciso(self, t):
140         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
141
142     def localiso(self, t):
143         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
144
145     def isotz(self, offset):
146         """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
147         return '{:+03d}:{:02d}'.format(offset/60, offset%60)
148
149     # Test websocket reconnection on (un)execpted close
150     def _test_websocket_reconnect(self, close_unexpected):
151         run_test_server.authorize_with('active')
152         events = Queue.Queue(100)
153
154         logstream = io.BytesIO()
155         rootLogger = logging.getLogger()
156         streamHandler = logging.StreamHandler(logstream)
157         rootLogger.addHandler(streamHandler)
158
159         filters = [['object_uuid', 'is_a', 'arvados#human']]
160         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
161         self.ws = arvados.events.subscribe(
162             arvados.api('v1'), filters,
163             events.put_nowait,
164             poll_fallback=False,
165             last_log_id=None)
166         self.assertIsInstance(self.ws, arvados.events.EventClient)
167         self.assertEqual(200, events.get(True, 5)['status'])
168
169         # create obj
170         human = arvados.api('v1').humans().create(body={}).execute()
171
172         # expect an event
173         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
174         with self.assertRaises(Queue.Empty):
175             self.assertEqual(events.get(True, 2), None)
176
177         # close (im)properly
178         if close_unexpected:
179             self.ws.ec.close_connection()
180         else:
181             self.ws.close()
182
183         # create one more obj
184         human2 = arvados.api('v1').humans().create(body={}).execute()
185
186         # (un)expect the object creation event
187         if close_unexpected:
188             log_object_uuids = []
189             for i in range(0, 2):
190                 event = events.get(True, 5)
191                 if event.get('object_uuid') != None:
192                     log_object_uuids.append(event['object_uuid'])
193             with self.assertRaises(Queue.Empty):
194                 self.assertEqual(events.get(True, 2), None)
195             self.assertNotIn(human['uuid'], log_object_uuids)
196             self.assertIn(human2['uuid'], log_object_uuids)
197         else:
198             with self.assertRaises(Queue.Empty):
199                 self.assertEqual(events.get(True, 2), None)
200
201         # verify log message to ensure that an (un)expected close
202         log_messages = logstream.getvalue()
203         closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
204         retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
205         if close_unexpected:
206             self.assertNotEqual(closeLogFound, -1)
207         else:
208             self.assertEqual(closeLogFound, -1)
209         rootLogger.removeHandler(streamHandler)
210
211     def test_websocket_reconnect_on_unexpected_close(self):
212         self._test_websocket_reconnect(True)
213
214     def test_websocket_no_reconnect_on_close_by_user(self):
215         self._test_websocket_reconnect(False)
216
217     # Test websocket reconnection retry
218     @mock.patch('arvados.events._EventClient.connect')
219     def test_websocket_reconnect_retry(self, event_client_connect):
220         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
221
222         logstream = io.BytesIO()
223         rootLogger = logging.getLogger()
224         streamHandler = logging.StreamHandler(logstream)
225         rootLogger.addHandler(streamHandler)
226
227         run_test_server.authorize_with('active')
228         events = Queue.Queue(100)
229
230         filters = [['object_uuid', 'is_a', 'arvados#human']]
231         self.ws = arvados.events.subscribe(
232             arvados.api('v1'), filters,
233             events.put_nowait,
234             poll_fallback=False,
235             last_log_id=None)
236         self.assertIsInstance(self.ws, arvados.events.EventClient)
237
238         # simulate improper close
239         self.ws.on_closed()
240
241         # verify log messages to ensure retry happened
242         log_messages = logstream.getvalue()
243         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
244         self.assertNotEqual(found, -1)
245         rootLogger.removeHandler(streamHandler)
246
247     @mock.patch('arvados.events._EventClient')
248     def test_subscribe_method(self, websocket_client):
249         filters = [['object_uuid', 'is_a', 'arvados#human']]
250         client = arvados.events.EventClient(
251             self.MOCK_WS_URL, [], lambda event: None, None)
252         client.subscribe(filters[:], 99)
253         websocket_client().subscribe.assert_called_with(filters, 99)
254
255     @mock.patch('arvados.events._EventClient')
256     def test_unsubscribe(self, websocket_client):
257         filters = [['object_uuid', 'is_a', 'arvados#human']]
258         client = arvados.events.EventClient(
259             self.MOCK_WS_URL, filters[:], lambda event: None, None)
260         client.unsubscribe(filters[:])
261         websocket_client().unsubscribe.assert_called_with(filters)
262
263     @mock.patch('arvados.events._EventClient')
264     def test_run_forever_survives_reconnects(self, websocket_client):
265         connection_cond = threading.Condition()
266         def ws_connect():
267             with connection_cond:
268                 connection_cond.notify_all()
269         websocket_client().connect.side_effect = ws_connect
270         client = arvados.events.EventClient(
271             self.MOCK_WS_URL, [], lambda event: None, None)
272         with connection_cond:
273             forever_thread = threading.Thread(target=client.run_forever)
274             forever_thread.start()
275             # Simulate an unexpected disconnect, and wait for reconnect.
276             close_thread = threading.Thread(target=client.on_closed)
277             close_thread.start()
278             connection_cond.wait()
279         close_thread.join()
280         run_forever_alive = forever_thread.is_alive()
281         client.close()
282         forever_thread.join()
283         self.assertTrue(run_forever_alive)
284         self.assertEqual(2, websocket_client().connect.call_count)
285
286
287 class PollClientTestCase(unittest.TestCase):
288     class MockLogs(object):
289         def __init__(self):
290             self.logs = []
291             self.lock = threading.Lock()
292
293         def add(self, log):
294             with self.lock:
295                 self.logs.append(log)
296
297         def return_list(self, num_retries=None):
298             with self.lock:
299                 retval = self.logs
300                 self.logs = []
301             return {'items': retval, 'items_available': len(retval)}
302
303
304     def setUp(self):
305         self.logs = self.MockLogs()
306         self.arv = mock.MagicMock(name='arvados.api()')
307         self.arv.logs().list().execute.side_effect = self.logs.return_list
308         self.callback_cond = threading.Condition()
309         self.recv_events = []
310
311     def tearDown(self):
312         if hasattr(self, 'client'):
313             self.client.close(timeout=None)
314
315     def callback(self, event):
316         with self.callback_cond:
317             self.recv_events.append(event)
318             self.callback_cond.notify_all()
319
320     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
321         if filters is None:
322             filters = []
323         if callback is None:
324             callback = self.callback
325         self.client = arvados.events.PollClient(
326             self.arv, filters, callback, poll_time, last_log_id)
327
328     def was_filter_used(self, target):
329         return any(target in call[-1].get('filters', [])
330                    for call in self.arv.logs().list.call_args_list)
331
332     def test_callback(self):
333         test_log = {'id': 12345, 'testkey': 'testtext'}
334         self.logs.add({'id': 123})
335         self.build_client(poll_time=.01)
336         with self.callback_cond:
337             self.client.start()
338             self.callback_cond.wait()
339             self.logs.add(test_log.copy())
340             self.callback_cond.wait()
341         self.client.close(timeout=None)
342         self.assertIn(test_log, self.recv_events)
343
344     def test_subscribe(self):
345         client_filter = ['kind', '=', 'arvados#test']
346         self.build_client()
347         self.client.subscribe([client_filter[:]])
348         with self.callback_cond:
349             self.client.start()
350             self.callback_cond.wait()
351         self.client.close(timeout=None)
352         self.assertTrue(self.was_filter_used(client_filter))
353
354     def test_unsubscribe(self):
355         client_filter = ['kind', '=', 'arvados#test']
356         self.build_client()
357         self.client.subscribe([client_filter[:]])
358         self.client.unsubscribe([client_filter[:]])
359         self.client.start()
360         self.client.close(timeout=None)
361         self.assertFalse(self.was_filter_used(client_filter))
362
363     def test_run_forever(self):
364         self.build_client()
365         with self.callback_cond:
366             self.client.start()
367             forever_thread = threading.Thread(target=self.client.run_forever)
368             forever_thread.start()
369             self.callback_cond.wait()
370         self.assertTrue(forever_thread.is_alive())
371         self.client.close()
372         forever_thread.join()