Merge branch 'master' into 10293-cwl-cr-output
[arvados.git] / sdk / python / tests / test_events.py
1 import arvados
2 import io
3 import logging
4 import mock
5 import Queue
6 import run_test_server
7 import threading
8 import time
9 import unittest
10
11 import arvados_testutil
12
13 class WebsocketTest(run_test_server.TestCaseWithServers):
14     MAIN_SERVER = {}
15
16     TIME_PAST = time.time()-3600
17     TIME_FUTURE = time.time()+3600
18     MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
19
20     TEST_TIMEOUT = 10.0
21
22     def setUp(self):
23         self.ws = None
24
25     def tearDown(self):
26         try:
27             if self.ws:
28                 self.ws.close()
29         except Exception as e:
30             print("Error in teardown: ", e)
31         super(WebsocketTest, self).tearDown()
32         run_test_server.reset()
33
34     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
35         run_test_server.authorize_with('active')
36         events = Queue.Queue(100)
37
38         # Create ancestor before subscribing.
39         # When listening with start_time in the past, this should also be retrieved.
40         # However, when start_time is omitted in subscribe, this should not be fetched.
41         ancestor = arvados.api('v1').humans().create(body={}).execute()
42
43         filters = [['object_uuid', 'is_a', 'arvados#human']]
44         if start_time:
45             filters.append(['created_at', '>=', start_time])
46
47         self.ws = arvados.events.subscribe(
48             arvados.api('v1'), filters,
49             events.put_nowait,
50             poll_fallback=poll_fallback,
51             last_log_id=(1 if start_time else None))
52         self.assertIsInstance(self.ws, expect_type)
53         self.assertEqual(200, events.get(True, 5)['status'])
54         human = arvados.api('v1').humans().create(body={}).execute()
55
56         log_object_uuids = []
57         for i in range(0, expected):
58             log_object_uuids.append(events.get(True, 5)['object_uuid'])
59
60         if expected > 0:
61             self.assertIn(human['uuid'], log_object_uuids)
62
63         if expected > 1:
64             self.assertIn(ancestor['uuid'], log_object_uuids)
65
66         with self.assertRaises(Queue.Empty):
67             # assertEqual just serves to show us what unexpected thing
68             # comes out of the queue when the assertRaises fails; when
69             # the test passes, this assertEqual doesn't get called.
70             self.assertEqual(events.get(True, 2), None)
71
72     def test_subscribe_websocket(self):
73         self._test_subscribe(
74             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
75
76     @mock.patch('arvados.events.EventClient.__init__')
77     def test_subscribe_poll(self, event_client_constr):
78         event_client_constr.side_effect = Exception('All is well')
79         self._test_subscribe(
80             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
81
82     def test_subscribe_poll_retry(self):
83         api_mock = mock.MagicMock()
84         n = []
85         def on_ev(ev):
86             n.append(ev)
87
88         error_mock = mock.MagicMock()
89         error_mock.resp.status = 0
90         error_mock._get_reason.return_value = "testing"
91         api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
92                                                       {"items": [{"id": 1}], "items_available": 1},
93                                                       arvados.errors.ApiError(error_mock, ""),
94                                                       {"items": [{"id": 1}], "items_available": 1})
95         pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
96         pc.start()
97         while len(n) < 2:
98             time.sleep(.1)
99         pc.close()
100
101     def test_subscribe_websocket_with_start_time_past(self):
102         self._test_subscribe(
103             poll_fallback=False, expect_type=arvados.events.EventClient,
104             start_time=self.localiso(self.TIME_PAST),
105             expected=2)
106
107     @mock.patch('arvados.events.EventClient.__init__')
108     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
109         event_client_constr.side_effect = Exception('All is well')
110         self._test_subscribe(
111             poll_fallback=0.25, expect_type=arvados.events.PollClient,
112             start_time=self.localiso(self.TIME_PAST),
113             expected=2)
114
115     def test_subscribe_websocket_with_start_time_future(self):
116         self._test_subscribe(
117             poll_fallback=False, expect_type=arvados.events.EventClient,
118             start_time=self.localiso(self.TIME_FUTURE),
119             expected=0)
120
121     @mock.patch('arvados.events.EventClient.__init__')
122     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
123         event_client_constr.side_effect = Exception('All is well')
124         self._test_subscribe(
125             poll_fallback=0.25, expect_type=arvados.events.PollClient,
126             start_time=self.localiso(self.TIME_FUTURE),
127             expected=0)
128
129     def test_subscribe_websocket_with_start_time_past_utc(self):
130         self._test_subscribe(
131             poll_fallback=False, expect_type=arvados.events.EventClient,
132             start_time=self.utciso(self.TIME_PAST),
133             expected=2)
134
135     def test_subscribe_websocket_with_start_time_future_utc(self):
136         self._test_subscribe(
137             poll_fallback=False, expect_type=arvados.events.EventClient,
138             start_time=self.utciso(self.TIME_FUTURE),
139             expected=0)
140
141     def utciso(self, t):
142         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
143
144     def localiso(self, t):
145         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
146
147     def isotz(self, offset):
148         """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
149         return '{:+03d}{:02d}'.format(offset/60, offset%60)
150
151     # Test websocket reconnection on (un)execpted close
152     def _test_websocket_reconnect(self, close_unexpected):
153         run_test_server.authorize_with('active')
154         events = Queue.Queue(100)
155
156         logstream = io.BytesIO()
157         rootLogger = logging.getLogger()
158         streamHandler = logging.StreamHandler(logstream)
159         rootLogger.addHandler(streamHandler)
160
161         filters = [['object_uuid', 'is_a', 'arvados#human']]
162         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
163         self.ws = arvados.events.subscribe(
164             arvados.api('v1'), filters,
165             events.put_nowait,
166             poll_fallback=False,
167             last_log_id=None)
168         self.assertIsInstance(self.ws, arvados.events.EventClient)
169         self.assertEqual(200, events.get(True, 5)['status'])
170
171         # create obj
172         human = arvados.api('v1').humans().create(body={}).execute()
173
174         # expect an event
175         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
176         with self.assertRaises(Queue.Empty):
177             self.assertEqual(events.get(True, 2), None)
178
179         # close (im)properly
180         if close_unexpected:
181             self.ws.ec.close_connection()
182         else:
183             self.ws.close()
184
185         # create one more obj
186         human2 = arvados.api('v1').humans().create(body={}).execute()
187
188         # (un)expect the object creation event
189         if close_unexpected:
190             log_object_uuids = []
191             for i in range(0, 2):
192                 event = events.get(True, 5)
193                 if event.get('object_uuid') != None:
194                     log_object_uuids.append(event['object_uuid'])
195             with self.assertRaises(Queue.Empty):
196                 self.assertEqual(events.get(True, 2), None)
197             self.assertNotIn(human['uuid'], log_object_uuids)
198             self.assertIn(human2['uuid'], log_object_uuids)
199         else:
200             with self.assertRaises(Queue.Empty):
201                 self.assertEqual(events.get(True, 2), None)
202
203         # verify log message to ensure that an (un)expected close
204         log_messages = logstream.getvalue()
205         closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
206         retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
207         if close_unexpected:
208             self.assertNotEqual(closeLogFound, -1)
209         else:
210             self.assertEqual(closeLogFound, -1)
211         rootLogger.removeHandler(streamHandler)
212
213     def test_websocket_reconnect_on_unexpected_close(self):
214         self._test_websocket_reconnect(True)
215
216     def test_websocket_no_reconnect_on_close_by_user(self):
217         self._test_websocket_reconnect(False)
218
219     # Test websocket reconnection retry
220     @mock.patch('arvados.events._EventClient.connect')
221     def test_websocket_reconnect_retry(self, event_client_connect):
222         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
223
224         logstream = io.BytesIO()
225         rootLogger = logging.getLogger()
226         streamHandler = logging.StreamHandler(logstream)
227         rootLogger.addHandler(streamHandler)
228
229         run_test_server.authorize_with('active')
230         events = Queue.Queue(100)
231
232         filters = [['object_uuid', 'is_a', 'arvados#human']]
233         self.ws = arvados.events.subscribe(
234             arvados.api('v1'), filters,
235             events.put_nowait,
236             poll_fallback=False,
237             last_log_id=None)
238         self.assertIsInstance(self.ws, arvados.events.EventClient)
239
240         # simulate improper close
241         self.ws.on_closed()
242
243         # verify log messages to ensure retry happened
244         log_messages = logstream.getvalue()
245         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
246         self.assertNotEqual(found, -1)
247         rootLogger.removeHandler(streamHandler)
248
249     @mock.patch('arvados.events._EventClient')
250     def test_subscribe_method(self, websocket_client):
251         filters = [['object_uuid', 'is_a', 'arvados#human']]
252         client = arvados.events.EventClient(
253             self.MOCK_WS_URL, [], lambda event: None, None)
254         client.subscribe(filters[:], 99)
255         websocket_client().subscribe.assert_called_with(filters, 99)
256
257     @mock.patch('arvados.events._EventClient')
258     def test_unsubscribe(self, websocket_client):
259         filters = [['object_uuid', 'is_a', 'arvados#human']]
260         client = arvados.events.EventClient(
261             self.MOCK_WS_URL, filters[:], lambda event: None, None)
262         client.unsubscribe(filters[:])
263         websocket_client().unsubscribe.assert_called_with(filters)
264
265     @mock.patch('arvados.events._EventClient')
266     def test_run_forever_survives_reconnects(self, websocket_client):
267         connected = threading.Event()
268         websocket_client().connect.side_effect = connected.set
269         client = arvados.events.EventClient(
270             self.MOCK_WS_URL, [], lambda event: None, None)
271         forever_thread = threading.Thread(target=client.run_forever)
272         forever_thread.start()
273         # Simulate an unexpected disconnect, and wait for reconnect.
274         close_thread = threading.Thread(target=client.on_closed)
275         close_thread.start()
276         self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
277         close_thread.join()
278         run_forever_alive = forever_thread.is_alive()
279         client.close()
280         forever_thread.join()
281         self.assertTrue(run_forever_alive)
282         self.assertEqual(2, websocket_client().connect.call_count)
283
284
285 class PollClientTestCase(unittest.TestCase):
286     TEST_TIMEOUT = 10.0
287
288     class MockLogs(object):
289
290         def __init__(self):
291             self.logs = []
292             self.lock = threading.Lock()
293
294         def add(self, log):
295             with self.lock:
296                 self.logs.append(log)
297
298         def return_list(self, num_retries=None):
299             with self.lock:
300                 retval = self.logs
301                 self.logs = []
302             return {'items': retval, 'items_available': len(retval)}
303
304     def setUp(self):
305         self.logs = self.MockLogs()
306         self.arv = mock.MagicMock(name='arvados.api()')
307         self.arv.logs().list().execute.side_effect = self.logs.return_list
308         self.callback_called = threading.Event()
309         self.recv_events = []
310
311     def tearDown(self):
312         if hasattr(self, 'client'):
313             self.client.close(timeout=None)
314
315     def callback(self, event):
316         self.recv_events.append(event)
317         self.callback_called.set()
318
319     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
320         if filters is None:
321             filters = []
322         if callback is None:
323             callback = self.callback
324         self.client = arvados.events.PollClient(
325             self.arv, filters, callback, poll_time, last_log_id)
326
327     def was_filter_used(self, target):
328         return any(target in call[-1].get('filters', [])
329                    for call in self.arv.logs().list.call_args_list)
330
331     def test_callback(self):
332         test_log = {'id': 12345, 'testkey': 'testtext'}
333         self.logs.add({'id': 123})
334         self.build_client(poll_time=.01)
335         self.client.start()
336         self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
337         self.callback_called.clear()
338         self.logs.add(test_log.copy())
339         self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
340         self.client.close(timeout=None)
341         self.assertIn(test_log, self.recv_events)
342
343     def test_subscribe(self):
344         client_filter = ['kind', '=', 'arvados#test']
345         self.build_client()
346         self.client.subscribe([client_filter[:]])
347         self.client.start()
348         self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
349         self.client.close(timeout=None)
350         self.assertTrue(self.was_filter_used(client_filter))
351
352     def test_unsubscribe(self):
353         client_filter = ['kind', '=', 'arvados#test']
354         self.build_client()
355         self.client.subscribe([client_filter[:]])
356         self.client.unsubscribe([client_filter[:]])
357         self.client.start()
358         self.client.close(timeout=None)
359         self.assertFalse(self.was_filter_used(client_filter))
360
361     def test_run_forever(self):
362         self.build_client()
363         self.client.start()
364         forever_thread = threading.Thread(target=self.client.run_forever)
365         forever_thread.start()
366         self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
367         self.assertTrue(forever_thread.is_alive())
368         self.client.close()
369         forever_thread.join()