Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / sdk / python / tests / test_events.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import print_function
6 from __future__ import absolute_import
7 from __future__ import division
8 from future import standard_library
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import logging
13 import mock
14 import queue
15 import sys
16 import threading
17 import time
18 import unittest
19
20 import arvados
21 from . import arvados_testutil as tutil
22 from . import run_test_server
23
24
25 class WebsocketTest(run_test_server.TestCaseWithServers):
26     MAIN_SERVER = {}
27
28     TIME_PAST = time.time()-3600
29     TIME_FUTURE = time.time()+3600
30     MOCK_WS_URL = 'wss://[{}]/'.format(tutil.TEST_HOST)
31
32     TEST_TIMEOUT = 10.0
33
34     def setUp(self):
35         self.ws = None
36
37     def tearDown(self):
38         try:
39             if self.ws:
40                 self.ws.close()
41         except Exception as e:
42             print("Error in teardown: ", e)
43         super(WebsocketTest, self).tearDown()
44         run_test_server.reset()
45
46     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
47         run_test_server.authorize_with('active')
48         events = queue.Queue(100)
49
50         # Create ancestor before subscribing.
51         # When listening with start_time in the past, this should also be retrieved.
52         # However, when start_time is omitted in subscribe, this should not be fetched.
53         ancestor = arvados.api('v1').humans().create(body={}).execute()
54
55         filters = [['object_uuid', 'is_a', 'arvados#human']]
56         if start_time:
57             filters.append(['created_at', '>=', start_time])
58
59         self.ws = arvados.events.subscribe(
60             arvados.api('v1'), filters,
61             events.put_nowait,
62             poll_fallback=poll_fallback,
63             last_log_id=(1 if start_time else None))
64         self.assertIsInstance(self.ws, expect_type)
65         self.assertEqual(200, events.get(True, 5)['status'])
66         human = arvados.api('v1').humans().create(body={}).execute()
67
68         want_uuids = []
69         if expected > 0:
70             want_uuids.append(human['uuid'])
71         if expected > 1:
72             want_uuids.append(ancestor['uuid'])
73         log_object_uuids = []
74         while set(want_uuids) - set(log_object_uuids):
75             log_object_uuids.append(events.get(True, 5)['object_uuid'])
76
77         if expected < 2:
78             with self.assertRaises(queue.Empty):
79                 # assertEqual just serves to show us what unexpected
80                 # thing comes out of the queue when the assertRaises
81                 # fails; when the test passes, this assertEqual
82                 # doesn't get called.
83                 self.assertEqual(events.get(True, 2), None)
84
85     def test_subscribe_websocket(self):
86         self._test_subscribe(
87             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
88
89     @mock.patch('arvados.events.EventClient.__init__')
90     def test_subscribe_poll(self, event_client_constr):
91         event_client_constr.side_effect = Exception('All is well')
92         self._test_subscribe(
93             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
94
95     def test_subscribe_poll_retry(self):
96         api_mock = mock.MagicMock()
97         n = []
98         def on_ev(ev):
99             n.append(ev)
100
101         error_mock = mock.MagicMock()
102         error_mock.resp.status = 0
103         error_mock._get_reason.return_value = "testing"
104         api_mock.logs().list().execute.side_effect = (
105             arvados.errors.ApiError(error_mock, b""),
106             {"items": [{"id": 1}], "items_available": 1},
107             arvados.errors.ApiError(error_mock, b""),
108             {"items": [{"id": 1}], "items_available": 1},
109         )
110         pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
111         pc.start()
112         while len(n) < 2:
113             time.sleep(.1)
114         pc.close()
115
116     def test_subscribe_websocket_with_start_time_past(self):
117         self._test_subscribe(
118             poll_fallback=False, expect_type=arvados.events.EventClient,
119             start_time=self.localiso(self.TIME_PAST),
120             expected=2)
121
122     @mock.patch('arvados.events.EventClient.__init__')
123     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
124         event_client_constr.side_effect = Exception('All is well')
125         self._test_subscribe(
126             poll_fallback=0.25, expect_type=arvados.events.PollClient,
127             start_time=self.localiso(self.TIME_PAST),
128             expected=2)
129
130     def test_subscribe_websocket_with_start_time_future(self):
131         self._test_subscribe(
132             poll_fallback=False, expect_type=arvados.events.EventClient,
133             start_time=self.localiso(self.TIME_FUTURE),
134             expected=0)
135
136     @mock.patch('arvados.events.EventClient.__init__')
137     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
138         event_client_constr.side_effect = Exception('All is well')
139         self._test_subscribe(
140             poll_fallback=0.25, expect_type=arvados.events.PollClient,
141             start_time=self.localiso(self.TIME_FUTURE),
142             expected=0)
143
144     def test_subscribe_websocket_with_start_time_past_utc(self):
145         self._test_subscribe(
146             poll_fallback=False, expect_type=arvados.events.EventClient,
147             start_time=self.utciso(self.TIME_PAST),
148             expected=2)
149
150     def test_subscribe_websocket_with_start_time_future_utc(self):
151         self._test_subscribe(
152             poll_fallback=False, expect_type=arvados.events.EventClient,
153             start_time=self.utciso(self.TIME_FUTURE),
154             expected=0)
155
156     def utciso(self, t):
157         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
158
159     def localiso(self, t):
160         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone//60)
161
162     def isotz(self, offset):
163         """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
164         return '{:+03d}:{:02d}'.format(offset//60, offset%60)
165
166     # Test websocket reconnection on (un)execpted close
167     def _test_websocket_reconnect(self, close_unexpected):
168         run_test_server.authorize_with('active')
169         events = queue.Queue(100)
170
171         logstream = tutil.StringIO()
172         rootLogger = logging.getLogger()
173         streamHandler = logging.StreamHandler(logstream)
174         rootLogger.addHandler(streamHandler)
175
176         filters = [['object_uuid', 'is_a', 'arvados#human']]
177         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
178         self.ws = arvados.events.subscribe(
179             arvados.api('v1'), filters,
180             events.put_nowait,
181             poll_fallback=False,
182             last_log_id=None)
183         self.assertIsInstance(self.ws, arvados.events.EventClient)
184         self.assertEqual(200, events.get(True, 5)['status'])
185
186         # create obj
187         human = arvados.api('v1').humans().create(body={}).execute()
188
189         # expect an event
190         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
191         with self.assertRaises(queue.Empty):
192             self.assertEqual(events.get(True, 2), None)
193
194         # close (im)properly
195         if close_unexpected:
196             self.ws.ec.close_connection()
197         else:
198             self.ws.close()
199
200         # create one more obj
201         human2 = arvados.api('v1').humans().create(body={}).execute()
202
203         # (un)expect the object creation event
204         if close_unexpected:
205             log_object_uuids = []
206             for i in range(0, 2):
207                 event = events.get(True, 5)
208                 if event.get('object_uuid') != None:
209                     log_object_uuids.append(event['object_uuid'])
210             with self.assertRaises(queue.Empty):
211                 self.assertEqual(events.get(True, 2), None)
212             self.assertNotIn(human['uuid'], log_object_uuids)
213             self.assertIn(human2['uuid'], log_object_uuids)
214         else:
215             with self.assertRaises(queue.Empty):
216                 self.assertEqual(events.get(True, 2), None)
217
218         # verify log message to ensure that an (un)expected close
219         log_messages = logstream.getvalue()
220         closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
221         retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
222         if close_unexpected:
223             self.assertNotEqual(closeLogFound, -1)
224         else:
225             self.assertEqual(closeLogFound, -1)
226         rootLogger.removeHandler(streamHandler)
227
228     def test_websocket_reconnect_on_unexpected_close(self):
229         self._test_websocket_reconnect(True)
230
231     def test_websocket_no_reconnect_on_close_by_user(self):
232         self._test_websocket_reconnect(False)
233
234     # Test websocket reconnection retry
235     @mock.patch('arvados.events._EventClient.connect')
236     def test_websocket_reconnect_retry(self, event_client_connect):
237         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
238
239         logstream = tutil.StringIO()
240         rootLogger = logging.getLogger()
241         streamHandler = logging.StreamHandler(logstream)
242         rootLogger.addHandler(streamHandler)
243
244         run_test_server.authorize_with('active')
245         events = queue.Queue(100)
246
247         filters = [['object_uuid', 'is_a', 'arvados#human']]
248         self.ws = arvados.events.subscribe(
249             arvados.api('v1'), filters,
250             events.put_nowait,
251             poll_fallback=False,
252             last_log_id=None)
253         self.assertIsInstance(self.ws, arvados.events.EventClient)
254
255         # simulate improper close
256         self.ws.on_closed()
257
258         # verify log messages to ensure retry happened
259         log_messages = logstream.getvalue()
260         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
261         self.assertNotEqual(found, -1)
262         rootLogger.removeHandler(streamHandler)
263
264     @mock.patch('arvados.events._EventClient')
265     def test_subscribe_method(self, websocket_client):
266         filters = [['object_uuid', 'is_a', 'arvados#human']]
267         client = arvados.events.EventClient(
268             self.MOCK_WS_URL, [], lambda event: None, None)
269         client.subscribe(filters[:], 99)
270         websocket_client().subscribe.assert_called_with(filters, 99)
271
272     @mock.patch('arvados.events._EventClient')
273     def test_unsubscribe(self, websocket_client):
274         filters = [['object_uuid', 'is_a', 'arvados#human']]
275         client = arvados.events.EventClient(
276             self.MOCK_WS_URL, filters[:], lambda event: None, None)
277         client.unsubscribe(filters[:])
278         websocket_client().unsubscribe.assert_called_with(filters)
279
280     @mock.patch('arvados.events._EventClient')
281     def test_run_forever_survives_reconnects(self, websocket_client):
282         connected = threading.Event()
283         websocket_client().connect.side_effect = connected.set
284         client = arvados.events.EventClient(
285             self.MOCK_WS_URL, [], lambda event: None, None)
286         forever_thread = threading.Thread(target=client.run_forever)
287         forever_thread.start()
288         # Simulate an unexpected disconnect, and wait for reconnect.
289         close_thread = threading.Thread(target=client.on_closed)
290         close_thread.start()
291         self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
292         close_thread.join()
293         run_forever_alive = forever_thread.is_alive()
294         client.close()
295         forever_thread.join()
296         self.assertTrue(run_forever_alive)
297         self.assertEqual(2, websocket_client().connect.call_count)
298
299
300 class PollClientTestCase(unittest.TestCase):
301     TEST_TIMEOUT = 10.0
302
303     class MockLogs(object):
304
305         def __init__(self):
306             self.logs = []
307             self.lock = threading.Lock()
308             self.api_called = threading.Event()
309
310         def add(self, log):
311             with self.lock:
312                 self.logs.append(log)
313
314         def return_list(self, num_retries=None):
315             self.api_called.set()
316             args, kwargs = self.list_func.call_args_list[-1]
317             filters = kwargs.get('filters', [])
318             if not any(True for f in filters if f[0] == 'id' and f[1] == '>'):
319                 # No 'id' filter was given -- this must be the probe
320                 # to determine the most recent id.
321                 return {'items': [{'id': 1}], 'items_available': 1}
322             with self.lock:
323                 retval = self.logs
324                 self.logs = []
325             return {'items': retval, 'items_available': len(retval)}
326
327     def setUp(self):
328         self.logs = self.MockLogs()
329         self.arv = mock.MagicMock(name='arvados.api()')
330         self.arv.logs().list().execute.side_effect = self.logs.return_list
331         # our MockLogs object's "execute" stub will need to inspect
332         # the call history to determine X in
333         # ....logs().list(filters=X).execute():
334         self.logs.list_func = self.arv.logs().list
335         self.status_ok = threading.Event()
336         self.event_received = threading.Event()
337         self.recv_events = []
338
339     def tearDown(self):
340         if hasattr(self, 'client'):
341             self.client.close(timeout=None)
342
343     def callback(self, event):
344         if event.get('status') == 200:
345             self.status_ok.set()
346         else:
347             self.recv_events.append(event)
348             self.event_received.set()
349
350     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
351         if filters is None:
352             filters = []
353         if callback is None:
354             callback = self.callback
355         self.client = arvados.events.PollClient(
356             self.arv, filters, callback, poll_time, last_log_id)
357
358     def was_filter_used(self, target):
359         return any(target in call[-1].get('filters', [])
360                    for call in self.arv.logs().list.call_args_list)
361
362     def test_callback(self):
363         test_log = {'id': 12345, 'testkey': 'testtext'}
364         self.logs.add({'id': 123})
365         self.build_client(poll_time=.01)
366         self.client.start()
367         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
368         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
369         self.event_received.clear()
370         self.logs.add(test_log.copy())
371         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
372         self.assertIn(test_log, self.recv_events)
373
374     def test_subscribe(self):
375         client_filter = ['kind', '=', 'arvados#test']
376         self.build_client()
377         self.client.unsubscribe([])
378         self.client.subscribe([client_filter[:]])
379         self.client.start()
380         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
381         self.assertTrue(self.logs.api_called.wait(self.TEST_TIMEOUT))
382         self.assertTrue(self.was_filter_used(client_filter))
383
384     def test_unsubscribe(self):
385         should_filter = ['foo', '=', 'foo']
386         should_not_filter = ['foo', '=', 'bar']
387         self.build_client(poll_time=0.01)
388         self.client.unsubscribe([])
389         self.client.subscribe([should_not_filter[:]])
390         self.client.subscribe([should_filter[:]])
391         self.client.unsubscribe([should_not_filter[:]])
392         self.client.start()
393         self.logs.add({'id': 123})
394         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
395         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
396         self.assertTrue(self.was_filter_used(should_filter))
397         self.assertFalse(self.was_filter_used(should_not_filter))
398
399     def test_run_forever(self):
400         self.build_client()
401         self.client.start()
402         forever_thread = threading.Thread(target=self.client.run_forever)
403         forever_thread.start()
404         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
405         self.assertTrue(forever_thread.is_alive())
406         self.client.close()
407         forever_thread.join()
408         del self.client