7658: Clean up & handle subscription filters consistently across EventClient,
[arvados.git] / sdk / python / tests / test_websockets.py
1 import arvados
2 import arvados.events
3 from datetime import datetime, timedelta, tzinfo
4 import logging
5 import logging.handlers
6 import mock
7 import Queue
8 import run_test_server
9 import tempfile
10 import threading
11 import time
12 import unittest
13
14 class WebsocketTest(run_test_server.TestCaseWithServers):
15     MAIN_SERVER = {}
16
17     TIME_PAST = time.time()-3600
18     TIME_FUTURE = time.time()+3600
19
20     def setUp(self):
21         self.ws = None
22
23     def tearDown(self):
24         try:
25             if self.ws:
26                 self.ws.close()
27         except Exception as e:
28             print("Error in teardown: ", e)
29         super(WebsocketTest, self).tearDown()
30         run_test_server.reset()
31
32     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
33         run_test_server.authorize_with('active')
34         events = Queue.Queue(100)
35
36         # Create ancestor before subscribing.
37         # When listening with start_time in the past, this should also be retrieved.
38         # However, when start_time is omitted in subscribe, this should not be fetched.
39         ancestor = arvados.api('v1').humans().create(body={}).execute()
40
41         filters = [['object_uuid', 'is_a', 'arvados#human']]
42         if start_time:
43             filters.append(['created_at', '>=', start_time])
44
45         self.ws = arvados.events.subscribe(
46             arvados.api('v1'), filters,
47             events.put_nowait,
48             poll_fallback=poll_fallback,
49             last_log_id=(1 if start_time else None))
50         self.assertIsInstance(self.ws, expect_type)
51         self.assertEqual(200, events.get(True, 5)['status'])
52         human = arvados.api('v1').humans().create(body={}).execute()
53
54         log_object_uuids = []
55         for i in range(0, expected):
56             log_object_uuids.append(events.get(True, 5)['object_uuid'])
57
58         if expected > 0:
59             self.assertIn(human['uuid'], log_object_uuids)
60
61         if expected > 1:
62             self.assertIn(ancestor['uuid'], log_object_uuids)
63
64         with self.assertRaises(Queue.Empty):
65             # assertEqual just serves to show us what unexpected thing
66             # comes out of the queue when the assertRaises fails; when
67             # the test passes, this assertEqual doesn't get called.
68             self.assertEqual(events.get(True, 2), None)
69
70     def test_subscribe_websocket(self):
71         self._test_subscribe(
72             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
73
74     @mock.patch('arvados.events.EventClient.__init__')
75     def test_subscribe_poll(self, event_client_constr):
76         event_client_constr.side_effect = Exception('All is well')
77         self._test_subscribe(
78             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
79
80     def test_subscribe_websocket_with_start_time_past(self):
81         self._test_subscribe(
82             poll_fallback=False, expect_type=arvados.events.EventClient,
83             start_time=self.localiso(self.TIME_PAST),
84             expected=2)
85
86     @mock.patch('arvados.events.EventClient.__init__')
87     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
88         event_client_constr.side_effect = Exception('All is well')
89         self._test_subscribe(
90             poll_fallback=0.25, expect_type=arvados.events.PollClient,
91             start_time=self.localiso(self.TIME_PAST),
92             expected=2)
93
94     def test_subscribe_websocket_with_start_time_future(self):
95         self._test_subscribe(
96             poll_fallback=False, expect_type=arvados.events.EventClient,
97             start_time=self.localiso(self.TIME_FUTURE),
98             expected=0)
99
100     @mock.patch('arvados.events.EventClient.__init__')
101     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
102         event_client_constr.side_effect = Exception('All is well')
103         self._test_subscribe(
104             poll_fallback=0.25, expect_type=arvados.events.PollClient,
105             start_time=self.localiso(self.TIME_FUTURE),
106             expected=0)
107
108     def test_subscribe_websocket_with_start_time_past_utc(self):
109         self._test_subscribe(
110             poll_fallback=False, expect_type=arvados.events.EventClient,
111             start_time=self.utciso(self.TIME_PAST),
112             expected=2)
113
114     def test_subscribe_websocket_with_start_time_future_utc(self):
115         self._test_subscribe(
116             poll_fallback=False, expect_type=arvados.events.EventClient,
117             start_time=self.utciso(self.TIME_FUTURE),
118             expected=0)
119
120     def utciso(self, t):
121         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
122
123     def localiso(self, t):
124         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
125
126     def isotz(self, offset):
127         """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
128         return '{:+03d}{:02d}'.format(offset/60, offset%60)
129
130     # Test websocket reconnection on (un)execpted close
131     def _test_websocket_reconnect(self, close_unexpected):
132         run_test_server.authorize_with('active')
133         events = Queue.Queue(100)
134
135         log_file = tempfile.NamedTemporaryFile(suffix='log.out', delete=True)
136         rootLogger = logging.getLogger()
137         fileHandler = logging.FileHandler(log_file.name)
138         rootLogger.addHandler(fileHandler)
139
140         filters = [['object_uuid', 'is_a', 'arvados#human']]
141         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
142         self.ws = arvados.events.subscribe(
143             arvados.api('v1'), filters,
144             events.put_nowait,
145             poll_fallback=False,
146             last_log_id=None)
147         self.assertIsInstance(self.ws, arvados.events.EventClient)
148         self.assertEqual(200, events.get(True, 5)['status'])
149
150         # create obj
151         human = arvados.api('v1').humans().create(body={}).execute()
152
153         # expect an event
154         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
155         with self.assertRaises(Queue.Empty):
156             self.assertEqual(events.get(True, 2), None)
157
158         # close (im)properly
159         if close_unexpected:
160             self.ws.close_connection()
161         else:
162             self.ws.close()
163
164         # create one more obj
165         human2 = arvados.api('v1').humans().create(body={}).execute()
166
167         # (un)expect the object creation event
168         if close_unexpected:
169             log_object_uuids = []
170             for i in range(0, 2):
171                 event = events.get(True, 5)
172                 if event.get('object_uuid') != None:
173                     log_object_uuids.append(event['object_uuid'])
174             with self.assertRaises(Queue.Empty):
175                 self.assertEqual(events.get(True, 2), None)
176             self.assertNotIn(human['uuid'], log_object_uuids)
177             self.assertIn(human2['uuid'], log_object_uuids)
178         else:
179             with self.assertRaises(Queue.Empty):
180                 self.assertEqual(events.get(True, 2), None)
181
182         # verify log message to ensure that an (un)expected close
183         log_messages = log_file.read()
184         closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
185         retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
186         if close_unexpected:
187             self.assertNotEqual(closeLogFound, -1)
188         else:
189             self.assertEqual(closeLogFound, -1)
190         rootLogger.removeHandler(fileHandler)
191
192     def test_websocket_reconnect_on_unexpected_close(self):
193         self._test_websocket_reconnect(True)
194
195     def test_websocket_no_reconnect_on_close_by_user(self):
196         self._test_websocket_reconnect(False)
197
198     # Test websocket reconnection retry
199     @mock.patch('arvados.events._EventClient.connect')
200     def test_websocket_reconnect_retry(self, event_client_connect):
201         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
202
203         log_file = tempfile.NamedTemporaryFile(suffix='log.out', delete=True)
204         rootLogger = logging.getLogger()
205         fileHandler = logging.FileHandler(log_file.name)
206         rootLogger.addHandler(fileHandler)
207
208         run_test_server.authorize_with('active')
209         events = Queue.Queue(100)
210
211         filters = [['object_uuid', 'is_a', 'arvados#human']]
212         self.ws = arvados.events.subscribe(
213             arvados.api('v1'), filters,
214             events.put_nowait,
215             poll_fallback=False,
216             last_log_id=None)
217         self.assertIsInstance(self.ws, arvados.events.EventClient)
218
219         # simulate improper close
220         self.ws.on_closed()
221
222         # verify log messages to ensure retry happened
223         log_messages = log_file.read()
224         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect. Will retry")
225         self.assertNotEqual(found, -1)
226         rootLogger.removeHandler(fileHandler)