18995: Merge branch 'main' into 18995-code-cleanup-1
[arvados.git] / sdk / python / tests / test_events.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import print_function
6 from __future__ import absolute_import
7 from __future__ import division
8 from future import standard_library
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import logging
13 import mock
14 import queue
15 import sys
16 import threading
17 import time
18 import unittest
19
20 import arvados
21 from . import arvados_testutil as tutil
22 from . import run_test_server
23
24
25 class WebsocketTest(run_test_server.TestCaseWithServers):
26     MAIN_SERVER = {}
27
28     TIME_PAST = time.time()-3600
29     TIME_FUTURE = time.time()+3600
30     MOCK_WS_URL = 'wss://[{}]/'.format(tutil.TEST_HOST)
31
32     TEST_TIMEOUT = 10.0
33
34     def setUp(self):
35         self.ws = None
36
37     def tearDown(self):
38         try:
39             if self.ws:
40                 self.ws.close()
41         except Exception as e:
42             print("Error in teardown: ", e)
43         super(WebsocketTest, self).tearDown()
44         run_test_server.reset()
45
46     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
47         run_test_server.authorize_with('active')
48         events = queue.Queue(100)
49
50         # Create ancestor before subscribing.
51         # When listening with start_time in the past, this should also be retrieved.
52         # However, when start_time is omitted in subscribe, this should not be fetched.
53         ancestor = arvados.api('v1').humans().create(body={}).execute()
54
55         filters = [['object_uuid', 'is_a', 'arvados#human']]
56         if start_time:
57             filters.append(['created_at', '>=', start_time])
58
59         self.ws = arvados.events.subscribe(
60             arvados.api('v1'), filters,
61             events.put_nowait,
62             poll_fallback=poll_fallback,
63             last_log_id=(1 if start_time else None))
64         self.assertIsInstance(self.ws, expect_type)
65         self.assertEqual(200, events.get(True, 5)['status'])
66
67         if hasattr(self.ws, '_skip_old_events'):
68             # Avoid race by waiting for the first "find ID threshold"
69             # poll to finish.
70             deadline = time.time() + 10
71             while not self.ws._skip_old_events:
72                 self.assertLess(time.time(), deadline)
73                 time.sleep(0.1)
74         human = arvados.api('v1').humans().create(body={}).execute()
75
76         want_uuids = []
77         if expected > 0:
78             want_uuids.append(human['uuid'])
79         if expected > 1:
80             want_uuids.append(ancestor['uuid'])
81         log_object_uuids = []
82         while set(want_uuids) - set(log_object_uuids):
83             log_object_uuids.append(events.get(True, 5)['object_uuid'])
84
85         if expected < 2:
86             with self.assertRaises(queue.Empty):
87                 # assertEqual just serves to show us what unexpected
88                 # thing comes out of the queue when the assertRaises
89                 # fails; when the test passes, this assertEqual
90                 # doesn't get called.
91                 self.assertEqual(events.get(True, 2), None)
92
93     def test_subscribe_websocket(self):
94         self._test_subscribe(
95             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
96
97     @mock.patch('arvados.events.EventClient.__init__')
98     def test_subscribe_poll(self, event_client_constr):
99         event_client_constr.side_effect = Exception('All is well')
100         self._test_subscribe(
101             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
102
103     def test_subscribe_poll_retry(self):
104         api_mock = mock.MagicMock()
105         n = []
106         def on_ev(ev):
107             n.append(ev)
108
109         error_mock = mock.MagicMock()
110         error_mock.resp.status = 0
111         error_mock._get_reason.return_value = "testing"
112         api_mock.logs().list().execute.side_effect = (
113             arvados.errors.ApiError(error_mock, b""),
114             {"items": [{"id": 1}], "items_available": 1},
115             arvados.errors.ApiError(error_mock, b""),
116             {"items": [{"id": 1}], "items_available": 1},
117         )
118         pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
119         pc.start()
120         while len(n) < 2:
121             time.sleep(.1)
122         pc.close()
123
124     def test_subscribe_websocket_with_start_time_past(self):
125         self._test_subscribe(
126             poll_fallback=False, expect_type=arvados.events.EventClient,
127             start_time=self.localiso(self.TIME_PAST),
128             expected=2)
129
130     @mock.patch('arvados.events.EventClient.__init__')
131     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
132         event_client_constr.side_effect = Exception('All is well')
133         self._test_subscribe(
134             poll_fallback=0.25, expect_type=arvados.events.PollClient,
135             start_time=self.localiso(self.TIME_PAST),
136             expected=2)
137
138     def test_subscribe_websocket_with_start_time_future(self):
139         self._test_subscribe(
140             poll_fallback=False, expect_type=arvados.events.EventClient,
141             start_time=self.localiso(self.TIME_FUTURE),
142             expected=0)
143
144     @mock.patch('arvados.events.EventClient.__init__')
145     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
146         event_client_constr.side_effect = Exception('All is well')
147         self._test_subscribe(
148             poll_fallback=0.25, expect_type=arvados.events.PollClient,
149             start_time=self.localiso(self.TIME_FUTURE),
150             expected=0)
151
152     def test_subscribe_websocket_with_start_time_past_utc(self):
153         self._test_subscribe(
154             poll_fallback=False, expect_type=arvados.events.EventClient,
155             start_time=self.utciso(self.TIME_PAST),
156             expected=2)
157
158     def test_subscribe_websocket_with_start_time_future_utc(self):
159         self._test_subscribe(
160             poll_fallback=False, expect_type=arvados.events.EventClient,
161             start_time=self.utciso(self.TIME_FUTURE),
162             expected=0)
163
164     def utciso(self, t):
165         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
166
167     def localiso(self, t):
168         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone//60)
169
170     def isotz(self, offset):
171         """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
172         return '{:+03d}:{:02d}'.format(offset//60, offset%60)
173
174     # Test websocket reconnection on (un)expected close
175     def _test_websocket_reconnect(self, close_unexpected):
176         run_test_server.authorize_with('active')
177         events = queue.Queue(100)
178
179         logstream = tutil.StringIO()
180         rootLogger = logging.getLogger()
181         streamHandler = logging.StreamHandler(logstream)
182         rootLogger.addHandler(streamHandler)
183
184         filters = [['object_uuid', 'is_a', 'arvados#human']]
185         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
186         self.ws = arvados.events.subscribe(
187             arvados.api('v1'), filters,
188             events.put_nowait,
189             poll_fallback=False,
190             last_log_id=None)
191         self.assertIsInstance(self.ws, arvados.events.EventClient)
192         self.assertEqual(200, events.get(True, 5)['status'])
193
194         # create obj
195         human = arvados.api('v1').humans().create(body={}).execute()
196
197         # expect an event
198         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
199         with self.assertRaises(queue.Empty):
200             self.assertEqual(events.get(True, 2), None)
201
202         # close (im)properly
203         if close_unexpected:
204             self.ws.ec.close_connection()
205         else:
206             self.ws.close()
207
208         # create one more obj
209         human2 = arvados.api('v1').humans().create(body={}).execute()
210
211         # (un)expect the object creation event
212         if close_unexpected:
213             log_object_uuids = []
214             for i in range(0, 2):
215                 event = events.get(True, 5)
216                 if event.get('object_uuid') != None:
217                     log_object_uuids.append(event['object_uuid'])
218             with self.assertRaises(queue.Empty):
219                 self.assertEqual(events.get(True, 2), None)
220             self.assertNotIn(human['uuid'], log_object_uuids)
221             self.assertIn(human2['uuid'], log_object_uuids)
222         else:
223             with self.assertRaises(queue.Empty):
224                 self.assertEqual(events.get(True, 2), None)
225
226         # verify log message to ensure that an (un)expected close
227         log_messages = logstream.getvalue()
228         closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
229         retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
230         if close_unexpected:
231             self.assertNotEqual(closeLogFound, -1)
232         else:
233             self.assertEqual(closeLogFound, -1)
234         rootLogger.removeHandler(streamHandler)
235
236     def test_websocket_reconnect_on_unexpected_close(self):
237         self._test_websocket_reconnect(True)
238
239     def test_websocket_no_reconnect_on_close_by_user(self):
240         self._test_websocket_reconnect(False)
241
242     # Test websocket reconnection retry
243     @mock.patch('arvados.events._EventClient.connect')
244     def test_websocket_reconnect_retry(self, event_client_connect):
245         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
246
247         logstream = tutil.StringIO()
248         rootLogger = logging.getLogger()
249         streamHandler = logging.StreamHandler(logstream)
250         rootLogger.addHandler(streamHandler)
251
252         run_test_server.authorize_with('active')
253         events = queue.Queue(100)
254
255         filters = [['object_uuid', 'is_a', 'arvados#human']]
256         self.ws = arvados.events.subscribe(
257             arvados.api('v1'), filters,
258             events.put_nowait,
259             poll_fallback=False,
260             last_log_id=None)
261         self.assertIsInstance(self.ws, arvados.events.EventClient)
262
263         # simulate improper close
264         self.ws.on_closed()
265
266         # verify log messages to ensure retry happened
267         log_messages = logstream.getvalue()
268         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
269         self.assertNotEqual(found, -1)
270         rootLogger.removeHandler(streamHandler)
271
272     @mock.patch('arvados.events._EventClient')
273     def test_subscribe_method(self, websocket_client):
274         filters = [['object_uuid', 'is_a', 'arvados#human']]
275         client = arvados.events.EventClient(
276             self.MOCK_WS_URL, [], lambda event: None, None)
277         client.subscribe(filters[:], 99)
278         websocket_client().subscribe.assert_called_with(filters, 99)
279
280     @mock.patch('arvados.events._EventClient')
281     def test_unsubscribe(self, websocket_client):
282         filters = [['object_uuid', 'is_a', 'arvados#human']]
283         client = arvados.events.EventClient(
284             self.MOCK_WS_URL, filters[:], lambda event: None, None)
285         client.unsubscribe(filters[:])
286         websocket_client().unsubscribe.assert_called_with(filters)
287
288     @mock.patch('arvados.events._EventClient')
289     def test_run_forever_survives_reconnects(self, websocket_client):
290         connected = threading.Event()
291         websocket_client().connect.side_effect = connected.set
292         client = arvados.events.EventClient(
293             self.MOCK_WS_URL, [], lambda event: None, None)
294         forever_thread = threading.Thread(target=client.run_forever)
295         forever_thread.start()
296         # Simulate an unexpected disconnect, and wait for reconnect.
297         close_thread = threading.Thread(target=client.on_closed)
298         close_thread.start()
299         self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
300         close_thread.join()
301         run_forever_alive = forever_thread.is_alive()
302         client.close()
303         forever_thread.join()
304         self.assertTrue(run_forever_alive)
305         self.assertEqual(2, websocket_client().connect.call_count)
306
307
308 class PollClientTestCase(unittest.TestCase):
309     TEST_TIMEOUT = 10.0
310
311     class MockLogs(object):
312
313         def __init__(self):
314             self.logs = []
315             self.lock = threading.Lock()
316             self.api_called = threading.Event()
317
318         def add(self, log):
319             with self.lock:
320                 self.logs.append(log)
321
322         def return_list(self, num_retries=None):
323             self.api_called.set()
324             args, kwargs = self.list_func.call_args_list[-1]
325             filters = kwargs.get('filters', [])
326             if not any(True for f in filters if f[0] == 'id' and f[1] == '>'):
327                 # No 'id' filter was given -- this must be the probe
328                 # to determine the most recent id.
329                 return {'items': [{'id': 1}], 'items_available': 1}
330             with self.lock:
331                 retval = self.logs
332                 self.logs = []
333             return {'items': retval, 'items_available': len(retval)}
334
335     def setUp(self):
336         self.logs = self.MockLogs()
337         self.arv = mock.MagicMock(name='arvados.api()')
338         self.arv.logs().list().execute.side_effect = self.logs.return_list
339         # our MockLogs object's "execute" stub will need to inspect
340         # the call history to determine X in
341         # ....logs().list(filters=X).execute():
342         self.logs.list_func = self.arv.logs().list
343         self.status_ok = threading.Event()
344         self.event_received = threading.Event()
345         self.recv_events = []
346
347     def tearDown(self):
348         if hasattr(self, 'client'):
349             self.client.close(timeout=None)
350
351     def callback(self, event):
352         if event.get('status') == 200:
353             self.status_ok.set()
354         else:
355             self.recv_events.append(event)
356             self.event_received.set()
357
358     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
359         if filters is None:
360             filters = []
361         if callback is None:
362             callback = self.callback
363         self.client = arvados.events.PollClient(
364             self.arv, filters, callback, poll_time, last_log_id)
365
366     def was_filter_used(self, target):
367         return any(target in call[-1].get('filters', [])
368                    for call in self.arv.logs().list.call_args_list)
369
370     def test_callback(self):
371         test_log = {'id': 12345, 'testkey': 'testtext'}
372         self.logs.add({'id': 123})
373         self.build_client(poll_time=.01)
374         self.client.start()
375         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
376         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
377         self.event_received.clear()
378         self.logs.add(test_log.copy())
379         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
380         self.assertIn(test_log, self.recv_events)
381
382     def test_subscribe(self):
383         client_filter = ['kind', '=', 'arvados#test']
384         self.build_client()
385         self.client.unsubscribe([])
386         self.client.subscribe([client_filter[:]])
387         self.client.start()
388         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
389         self.assertTrue(self.logs.api_called.wait(self.TEST_TIMEOUT))
390         self.assertTrue(self.was_filter_used(client_filter))
391
392     def test_unsubscribe(self):
393         should_filter = ['foo', '=', 'foo']
394         should_not_filter = ['foo', '=', 'bar']
395         self.build_client(poll_time=0.01)
396         self.client.unsubscribe([])
397         self.client.subscribe([should_not_filter[:]])
398         self.client.subscribe([should_filter[:]])
399         self.client.unsubscribe([should_not_filter[:]])
400         self.client.start()
401         self.logs.add({'id': 123})
402         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
403         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
404         self.assertTrue(self.was_filter_used(should_filter))
405         self.assertFalse(self.was_filter_used(should_not_filter))
406
407     def test_run_forever(self):
408         self.build_client()
409         self.client.start()
410         forever_thread = threading.Thread(target=self.client.run_forever)
411         forever_thread.start()
412         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
413         self.assertTrue(forever_thread.is_alive())
414         self.client.close()
415         forever_thread.join()
416         del self.client