11308: run-tests --skip python2
[arvados.git] / sdk / python / tests / test_events.py
1 from __future__ import print_function
2 from __future__ import absolute_import
3 from __future__ import division
4 from future import standard_library
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
8 import arvados
9 import io
10 import logging
11 import mock
12 import queue
13 from . import run_test_server
14 import threading
15 import time
16 import unittest
17
18 from . import arvados_testutil
19
20 class WebsocketTest(run_test_server.TestCaseWithServers):
21     MAIN_SERVER = {}
22
23     TIME_PAST = time.time()-3600
24     TIME_FUTURE = time.time()+3600
25     MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
26
27     TEST_TIMEOUT = 10.0
28
29     def setUp(self):
30         self.ws = None
31
32     def tearDown(self):
33         try:
34             if self.ws:
35                 self.ws.close()
36         except Exception as e:
37             print(("Error in teardown: ", e))
38         super(WebsocketTest, self).tearDown()
39         run_test_server.reset()
40
41     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
42         run_test_server.authorize_with('active')
43         events = queue.Queue(100)
44
45         # Create ancestor before subscribing.
46         # When listening with start_time in the past, this should also be retrieved.
47         # However, when start_time is omitted in subscribe, this should not be fetched.
48         ancestor = arvados.api('v1').humans().create(body={}).execute()
49
50         filters = [['object_uuid', 'is_a', 'arvados#human']]
51         if start_time:
52             filters.append(['created_at', '>=', start_time])
53
54         self.ws = arvados.events.subscribe(
55             arvados.api('v1'), filters,
56             events.put_nowait,
57             poll_fallback=poll_fallback,
58             last_log_id=(1 if start_time else None))
59         self.assertIsInstance(self.ws, expect_type)
60         self.assertEqual(200, events.get(True, 5)['status'])
61         human = arvados.api('v1').humans().create(body={}).execute()
62
63         want_uuids = []
64         if expected > 0:
65             want_uuids.append(human['uuid'])
66         if expected > 1:
67             want_uuids.append(ancestor['uuid'])
68         log_object_uuids = []
69         while set(want_uuids) - set(log_object_uuids):
70             log_object_uuids.append(events.get(True, 5)['object_uuid'])
71
72         if expected < 2:
73             with self.assertRaises(queue.Empty):
74                 # assertEqual just serves to show us what unexpected
75                 # thing comes out of the queue when the assertRaises
76                 # fails; when the test passes, this assertEqual
77                 # doesn't get called.
78                 self.assertEqual(events.get(True, 2), None)
79
80     def test_subscribe_websocket(self):
81         self._test_subscribe(
82             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
83
84     @mock.patch('arvados.events.EventClient.__init__')
85     def test_subscribe_poll(self, event_client_constr):
86         event_client_constr.side_effect = Exception('All is well')
87         self._test_subscribe(
88             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
89
90     def test_subscribe_poll_retry(self):
91         api_mock = mock.MagicMock()
92         n = []
93         def on_ev(ev):
94             n.append(ev)
95
96         error_mock = mock.MagicMock()
97         error_mock.resp.status = 0
98         error_mock._get_reason.return_value = "testing"
99         api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
100                                                       {"items": [{"id": 1}], "items_available": 1},
101                                                       arvados.errors.ApiError(error_mock, ""),
102                                                       {"items": [{"id": 1}], "items_available": 1})
103         pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
104         pc.start()
105         while len(n) < 2:
106             time.sleep(.1)
107         pc.close()
108
109     def test_subscribe_websocket_with_start_time_past(self):
110         self._test_subscribe(
111             poll_fallback=False, expect_type=arvados.events.EventClient,
112             start_time=self.localiso(self.TIME_PAST),
113             expected=2)
114
115     @mock.patch('arvados.events.EventClient.__init__')
116     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
117         event_client_constr.side_effect = Exception('All is well')
118         self._test_subscribe(
119             poll_fallback=0.25, expect_type=arvados.events.PollClient,
120             start_time=self.localiso(self.TIME_PAST),
121             expected=2)
122
123     def test_subscribe_websocket_with_start_time_future(self):
124         self._test_subscribe(
125             poll_fallback=False, expect_type=arvados.events.EventClient,
126             start_time=self.localiso(self.TIME_FUTURE),
127             expected=0)
128
129     @mock.patch('arvados.events.EventClient.__init__')
130     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
131         event_client_constr.side_effect = Exception('All is well')
132         self._test_subscribe(
133             poll_fallback=0.25, expect_type=arvados.events.PollClient,
134             start_time=self.localiso(self.TIME_FUTURE),
135             expected=0)
136
137     def test_subscribe_websocket_with_start_time_past_utc(self):
138         self._test_subscribe(
139             poll_fallback=False, expect_type=arvados.events.EventClient,
140             start_time=self.utciso(self.TIME_PAST),
141             expected=2)
142
143     def test_subscribe_websocket_with_start_time_future_utc(self):
144         self._test_subscribe(
145             poll_fallback=False, expect_type=arvados.events.EventClient,
146             start_time=self.utciso(self.TIME_FUTURE),
147             expected=0)
148
149     def utciso(self, t):
150         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
151
152     def localiso(self, t):
153         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone//60)
154
155     def isotz(self, offset):
156         """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
157         return '{:+03d}:{:02d}'.format(offset//60, offset%60)
158
159     # Test websocket reconnection on (un)execpted close
160     def _test_websocket_reconnect(self, close_unexpected):
161         run_test_server.authorize_with('active')
162         events = queue.Queue(100)
163
164         logstream = io.BytesIO()
165         rootLogger = logging.getLogger()
166         streamHandler = logging.StreamHandler(logstream)
167         rootLogger.addHandler(streamHandler)
168
169         filters = [['object_uuid', 'is_a', 'arvados#human']]
170         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
171         self.ws = arvados.events.subscribe(
172             arvados.api('v1'), filters,
173             events.put_nowait,
174             poll_fallback=False,
175             last_log_id=None)
176         self.assertIsInstance(self.ws, arvados.events.EventClient)
177         self.assertEqual(200, events.get(True, 5)['status'])
178
179         # create obj
180         human = arvados.api('v1').humans().create(body={}).execute()
181
182         # expect an event
183         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
184         with self.assertRaises(queue.Empty):
185             self.assertEqual(events.get(True, 2), None)
186
187         # close (im)properly
188         if close_unexpected:
189             self.ws.ec.close_connection()
190         else:
191             self.ws.close()
192
193         # create one more obj
194         human2 = arvados.api('v1').humans().create(body={}).execute()
195
196         # (un)expect the object creation event
197         if close_unexpected:
198             log_object_uuids = []
199             for i in range(0, 2):
200                 event = events.get(True, 5)
201                 if event.get('object_uuid') != None:
202                     log_object_uuids.append(event['object_uuid'])
203             with self.assertRaises(queue.Empty):
204                 self.assertEqual(events.get(True, 2), None)
205             self.assertNotIn(human['uuid'], log_object_uuids)
206             self.assertIn(human2['uuid'], log_object_uuids)
207         else:
208             with self.assertRaises(queue.Empty):
209                 self.assertEqual(events.get(True, 2), None)
210
211         # verify log message to ensure that an (un)expected close
212         log_messages = logstream.getvalue()
213         closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
214         retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
215         if close_unexpected:
216             self.assertNotEqual(closeLogFound, -1)
217         else:
218             self.assertEqual(closeLogFound, -1)
219         rootLogger.removeHandler(streamHandler)
220
221     def test_websocket_reconnect_on_unexpected_close(self):
222         self._test_websocket_reconnect(True)
223
224     def test_websocket_no_reconnect_on_close_by_user(self):
225         self._test_websocket_reconnect(False)
226
227     # Test websocket reconnection retry
228     @mock.patch('arvados.events._EventClient.connect')
229     def test_websocket_reconnect_retry(self, event_client_connect):
230         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
231
232         logstream = io.BytesIO()
233         rootLogger = logging.getLogger()
234         streamHandler = logging.StreamHandler(logstream)
235         rootLogger.addHandler(streamHandler)
236
237         run_test_server.authorize_with('active')
238         events = queue.Queue(100)
239
240         filters = [['object_uuid', 'is_a', 'arvados#human']]
241         self.ws = arvados.events.subscribe(
242             arvados.api('v1'), filters,
243             events.put_nowait,
244             poll_fallback=False,
245             last_log_id=None)
246         self.assertIsInstance(self.ws, arvados.events.EventClient)
247
248         # simulate improper close
249         self.ws.on_closed()
250
251         # verify log messages to ensure retry happened
252         log_messages = logstream.getvalue()
253         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
254         self.assertNotEqual(found, -1)
255         rootLogger.removeHandler(streamHandler)
256
257     @mock.patch('arvados.events._EventClient')
258     def test_subscribe_method(self, websocket_client):
259         filters = [['object_uuid', 'is_a', 'arvados#human']]
260         client = arvados.events.EventClient(
261             self.MOCK_WS_URL, [], lambda event: None, None)
262         client.subscribe(filters[:], 99)
263         websocket_client().subscribe.assert_called_with(filters, 99)
264
265     @mock.patch('arvados.events._EventClient')
266     def test_unsubscribe(self, websocket_client):
267         filters = [['object_uuid', 'is_a', 'arvados#human']]
268         client = arvados.events.EventClient(
269             self.MOCK_WS_URL, filters[:], lambda event: None, None)
270         client.unsubscribe(filters[:])
271         websocket_client().unsubscribe.assert_called_with(filters)
272
273     @mock.patch('arvados.events._EventClient')
274     def test_run_forever_survives_reconnects(self, websocket_client):
275         connected = threading.Event()
276         websocket_client().connect.side_effect = connected.set
277         client = arvados.events.EventClient(
278             self.MOCK_WS_URL, [], lambda event: None, None)
279         forever_thread = threading.Thread(target=client.run_forever)
280         forever_thread.start()
281         # Simulate an unexpected disconnect, and wait for reconnect.
282         close_thread = threading.Thread(target=client.on_closed)
283         close_thread.start()
284         self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
285         close_thread.join()
286         run_forever_alive = forever_thread.is_alive()
287         client.close()
288         forever_thread.join()
289         self.assertTrue(run_forever_alive)
290         self.assertEqual(2, websocket_client().connect.call_count)
291
292
293 class PollClientTestCase(unittest.TestCase):
294     TEST_TIMEOUT = 10.0
295
296     class MockLogs(object):
297
298         def __init__(self):
299             self.logs = []
300             self.lock = threading.Lock()
301             self.api_called = threading.Event()
302
303         def add(self, log):
304             with self.lock:
305                 self.logs.append(log)
306
307         def return_list(self, num_retries=None):
308             self.api_called.set()
309             args, kwargs = self.list_func.call_args_list[-1]
310             filters = kwargs.get('filters', [])
311             if not any(True for f in filters if f[0] == 'id' and f[1] == '>'):
312                 # No 'id' filter was given -- this must be the probe
313                 # to determine the most recent id.
314                 return {'items': [{'id': 1}], 'items_available': 1}
315             with self.lock:
316                 retval = self.logs
317                 self.logs = []
318             return {'items': retval, 'items_available': len(retval)}
319
320     def setUp(self):
321         self.logs = self.MockLogs()
322         self.arv = mock.MagicMock(name='arvados.api()')
323         self.arv.logs().list().execute.side_effect = self.logs.return_list
324         # our MockLogs object's "execute" stub will need to inspect
325         # the call history to determine X in
326         # ....logs().list(filters=X).execute():
327         self.logs.list_func = self.arv.logs().list
328         self.status_ok = threading.Event()
329         self.event_received = threading.Event()
330         self.recv_events = []
331
332     def tearDown(self):
333         if hasattr(self, 'client'):
334             self.client.close(timeout=None)
335
336     def callback(self, event):
337         if event.get('status') == 200:
338             self.status_ok.set()
339         else:
340             self.recv_events.append(event)
341             self.event_received.set()
342
343     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
344         if filters is None:
345             filters = []
346         if callback is None:
347             callback = self.callback
348         self.client = arvados.events.PollClient(
349             self.arv, filters, callback, poll_time, last_log_id)
350
351     def was_filter_used(self, target):
352         return any(target in call[-1].get('filters', [])
353                    for call in self.arv.logs().list.call_args_list)
354
355     def test_callback(self):
356         test_log = {'id': 12345, 'testkey': 'testtext'}
357         self.logs.add({'id': 123})
358         self.build_client(poll_time=.01)
359         self.client.start()
360         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
361         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
362         self.event_received.clear()
363         self.logs.add(test_log.copy())
364         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
365         self.assertIn(test_log, self.recv_events)
366
367     def test_subscribe(self):
368         client_filter = ['kind', '=', 'arvados#test']
369         self.build_client()
370         self.client.unsubscribe([])
371         self.client.subscribe([client_filter[:]])
372         self.client.start()
373         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
374         self.assertTrue(self.logs.api_called.wait(self.TEST_TIMEOUT))
375         self.assertTrue(self.was_filter_used(client_filter))
376
377     def test_unsubscribe(self):
378         should_filter = ['foo', '=', 'foo']
379         should_not_filter = ['foo', '=', 'bar']
380         self.build_client(poll_time=0.01)
381         self.client.unsubscribe([])
382         self.client.subscribe([should_not_filter[:]])
383         self.client.subscribe([should_filter[:]])
384         self.client.unsubscribe([should_not_filter[:]])
385         self.client.start()
386         self.logs.add({'id': 123})
387         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
388         self.assertTrue(self.event_received.wait(self.TEST_TIMEOUT))
389         self.assertTrue(self.was_filter_used(should_filter))
390         self.assertFalse(self.was_filter_used(should_not_filter))
391
392     def test_run_forever(self):
393         self.build_client()
394         self.client.start()
395         forever_thread = threading.Thread(target=self.client.run_forever)
396         forever_thread.start()
397         self.assertTrue(self.status_ok.wait(self.TEST_TIMEOUT))
398         self.assertTrue(forever_thread.is_alive())
399         self.client.close()
400         forever_thread.join()
401         del self.client