Merge branch '8998-optimize-decode-www-form-component' closes #8998
[arvados.git] / sdk / python / tests / test_websockets.py
1 import arvados
2 import arvados.events
3 import arvados.errors
4 from datetime import datetime, timedelta, tzinfo
5 import logging
6 import logging.handlers
7 import mock
8 import Queue
9 import run_test_server
10 import StringIO
11 import tempfile
12 import threading
13 import time
14 import unittest
15
16 class WebsocketTest(run_test_server.TestCaseWithServers):
17     MAIN_SERVER = {}
18
19     TIME_PAST = time.time()-3600
20     TIME_FUTURE = time.time()+3600
21
22     def setUp(self):
23         self.ws = None
24
25     def tearDown(self):
26         try:
27             if self.ws:
28                 self.ws.close()
29         except Exception as e:
30             print("Error in teardown: ", e)
31         super(WebsocketTest, self).tearDown()
32         run_test_server.reset()
33
34     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
35         run_test_server.authorize_with('active')
36         events = Queue.Queue(100)
37
38         # Create ancestor before subscribing.
39         # When listening with start_time in the past, this should also be retrieved.
40         # However, when start_time is omitted in subscribe, this should not be fetched.
41         ancestor = arvados.api('v1').humans().create(body={}).execute()
42
43         filters = [['object_uuid', 'is_a', 'arvados#human']]
44         if start_time:
45             filters.append(['created_at', '>=', start_time])
46
47         self.ws = arvados.events.subscribe(
48             arvados.api('v1'), filters,
49             events.put_nowait,
50             poll_fallback=poll_fallback,
51             last_log_id=(1 if start_time else None))
52         self.assertIsInstance(self.ws, expect_type)
53         self.assertEqual(200, events.get(True, 5)['status'])
54         human = arvados.api('v1').humans().create(body={}).execute()
55
56         log_object_uuids = []
57         for i in range(0, expected):
58             log_object_uuids.append(events.get(True, 5)['object_uuid'])
59
60         if expected > 0:
61             self.assertIn(human['uuid'], log_object_uuids)
62
63         if expected > 1:
64             self.assertIn(ancestor['uuid'], log_object_uuids)
65
66         with self.assertRaises(Queue.Empty):
67             # assertEqual just serves to show us what unexpected thing
68             # comes out of the queue when the assertRaises fails; when
69             # the test passes, this assertEqual doesn't get called.
70             self.assertEqual(events.get(True, 2), None)
71
72     def test_subscribe_websocket(self):
73         self._test_subscribe(
74             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
75
76     @mock.patch('arvados.events.EventClient.__init__')
77     def test_subscribe_poll(self, event_client_constr):
78         event_client_constr.side_effect = Exception('All is well')
79         self._test_subscribe(
80             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
81
82     def test_subscribe_poll_retry(self):
83         api_mock = mock.MagicMock()
84         n = []
85         def on_ev(ev):
86             n.append(ev)
87
88         error_mock = mock.MagicMock()
89         error_mock.resp.status = 0
90         error_mock._get_reason.return_value = "testing"
91         api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
92                                                       {"items": [{"id": 1}], "items_available": 1},
93                                                       arvados.errors.ApiError(error_mock, ""),
94                                                       {"items": [{"id": 1}], "items_available": 1})
95         pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
96         pc.start()
97         while len(n) < 2:
98             time.sleep(.1)
99         pc.close()
100
101     def test_subscribe_websocket_with_start_time_past(self):
102         self._test_subscribe(
103             poll_fallback=False, expect_type=arvados.events.EventClient,
104             start_time=self.localiso(self.TIME_PAST),
105             expected=2)
106
107     @mock.patch('arvados.events.EventClient.__init__')
108     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
109         event_client_constr.side_effect = Exception('All is well')
110         self._test_subscribe(
111             poll_fallback=0.25, expect_type=arvados.events.PollClient,
112             start_time=self.localiso(self.TIME_PAST),
113             expected=2)
114
115     def test_subscribe_websocket_with_start_time_future(self):
116         self._test_subscribe(
117             poll_fallback=False, expect_type=arvados.events.EventClient,
118             start_time=self.localiso(self.TIME_FUTURE),
119             expected=0)
120
121     @mock.patch('arvados.events.EventClient.__init__')
122     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
123         event_client_constr.side_effect = Exception('All is well')
124         self._test_subscribe(
125             poll_fallback=0.25, expect_type=arvados.events.PollClient,
126             start_time=self.localiso(self.TIME_FUTURE),
127             expected=0)
128
129     def test_subscribe_websocket_with_start_time_past_utc(self):
130         self._test_subscribe(
131             poll_fallback=False, expect_type=arvados.events.EventClient,
132             start_time=self.utciso(self.TIME_PAST),
133             expected=2)
134
135     def test_subscribe_websocket_with_start_time_future_utc(self):
136         self._test_subscribe(
137             poll_fallback=False, expect_type=arvados.events.EventClient,
138             start_time=self.utciso(self.TIME_FUTURE),
139             expected=0)
140
141     def utciso(self, t):
142         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
143
144     def localiso(self, t):
145         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
146
147     def isotz(self, offset):
148         """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
149         return '{:+03d}{:02d}'.format(offset/60, offset%60)
150
151     # Test websocket reconnection on (un)execpted close
152     def _test_websocket_reconnect(self, close_unexpected):
153         run_test_server.authorize_with('active')
154         events = Queue.Queue(100)
155
156         logstream = StringIO.StringIO()
157         rootLogger = logging.getLogger()
158         streamHandler = logging.StreamHandler(logstream)
159         rootLogger.addHandler(streamHandler)
160
161         filters = [['object_uuid', 'is_a', 'arvados#human']]
162         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
163         self.ws = arvados.events.subscribe(
164             arvados.api('v1'), filters,
165             events.put_nowait,
166             poll_fallback=False,
167             last_log_id=None)
168         self.assertIsInstance(self.ws, arvados.events.EventClient)
169         self.assertEqual(200, events.get(True, 5)['status'])
170
171         # create obj
172         human = arvados.api('v1').humans().create(body={}).execute()
173
174         # expect an event
175         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
176         with self.assertRaises(Queue.Empty):
177             self.assertEqual(events.get(True, 2), None)
178
179         # close (im)properly
180         if close_unexpected:
181             self.ws.close_connection()
182         else:
183             self.ws.close()
184
185         # create one more obj
186         human2 = arvados.api('v1').humans().create(body={}).execute()
187
188         # (un)expect the object creation event
189         if close_unexpected:
190             log_object_uuids = []
191             for i in range(0, 2):
192                 event = events.get(True, 5)
193                 if event.get('object_uuid') != None:
194                     log_object_uuids.append(event['object_uuid'])
195             with self.assertRaises(Queue.Empty):
196                 self.assertEqual(events.get(True, 2), None)
197             self.assertNotIn(human['uuid'], log_object_uuids)
198             self.assertIn(human2['uuid'], log_object_uuids)
199         else:
200             with self.assertRaises(Queue.Empty):
201                 self.assertEqual(events.get(True, 2), None)
202
203         # verify log message to ensure that an (un)expected close
204         log_messages = logstream.getvalue()
205         closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
206         retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
207         if close_unexpected:
208             self.assertNotEqual(closeLogFound, -1)
209         else:
210             self.assertEqual(closeLogFound, -1)
211         rootLogger.removeHandler(streamHandler)
212
213     def test_websocket_reconnect_on_unexpected_close(self):
214         self._test_websocket_reconnect(True)
215
216     def test_websocket_no_reconnect_on_close_by_user(self):
217         self._test_websocket_reconnect(False)
218
219     # Test websocket reconnection retry
220     @mock.patch('arvados.events._EventClient.connect')
221     def test_websocket_reconnect_retry(self, event_client_connect):
222         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
223
224         logstream = StringIO.StringIO()
225         rootLogger = logging.getLogger()
226         streamHandler = logging.StreamHandler(logstream)
227         rootLogger.addHandler(streamHandler)
228
229         run_test_server.authorize_with('active')
230         events = Queue.Queue(100)
231
232         filters = [['object_uuid', 'is_a', 'arvados#human']]
233         self.ws = arvados.events.subscribe(
234             arvados.api('v1'), filters,
235             events.put_nowait,
236             poll_fallback=False,
237             last_log_id=None)
238         self.assertIsInstance(self.ws, arvados.events.EventClient)
239
240         # simulate improper close
241         self.ws.on_closed()
242
243         # verify log messages to ensure retry happened
244         log_messages = logstream.getvalue()
245         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
246         self.assertNotEqual(found, -1)
247         rootLogger.removeHandler(streamHandler)