7658: update EventClient.on_closed to retry on connect errors.
[arvados.git] / sdk / python / tests / test_websockets.py
1 import arvados
2 import arvados.events
3 from datetime import datetime, timedelta, tzinfo
4 import mock
5 import Queue
6 import run_test_server
7 import threading
8 import time
9 import unittest
10
11 class WebsocketTest(run_test_server.TestCaseWithServers):
12     MAIN_SERVER = {}
13
14     TIME_PAST = time.time()-3600
15     TIME_FUTURE = time.time()+3600
16
17     def setUp(self):
18         self.ws = None
19
20     def tearDown(self):
21         if self.ws:
22             self.ws.close()
23         super(WebsocketTest, self).tearDown()
24         run_test_server.reset()
25
26     def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
27         run_test_server.authorize_with('active')
28         events = Queue.Queue(100)
29
30         # Create ancestor before subscribing.
31         # When listening with start_time in the past, this should also be retrieved.
32         # However, when start_time is omitted in subscribe, this should not be fetched.
33         ancestor = arvados.api('v1').humans().create(body={}).execute()
34
35         filters = [['object_uuid', 'is_a', 'arvados#human']]
36         if start_time:
37             filters.append(['created_at', '>=', start_time])
38
39         self.ws = arvados.events.subscribe(
40             arvados.api('v1'), filters,
41             events.put_nowait,
42             poll_fallback=poll_fallback,
43             last_log_id=(1 if start_time else None))
44         self.assertIsInstance(self.ws, expect_type)
45         self.assertEqual(200, events.get(True, 5)['status'])
46         human = arvados.api('v1').humans().create(body={}).execute()
47
48         log_object_uuids = []
49         for i in range(0, expected):
50             log_object_uuids.append(events.get(True, 5)['object_uuid'])
51
52         if expected > 0:
53             self.assertIn(human['uuid'], log_object_uuids)
54
55         if expected > 1:
56             self.assertIn(ancestor['uuid'], log_object_uuids)
57
58         with self.assertRaises(Queue.Empty):
59             # assertEqual just serves to show us what unexpected thing
60             # comes out of the queue when the assertRaises fails; when
61             # the test passes, this assertEqual doesn't get called.
62             self.assertEqual(events.get(True, 2), None)
63
64     def test_subscribe_websocket(self):
65         self._test_subscribe(
66             poll_fallback=False, expect_type=arvados.events.EventClient, expected=1)
67
68     @mock.patch('arvados.events.EventClient.__init__')
69     def test_subscribe_poll(self, event_client_constr):
70         event_client_constr.side_effect = Exception('All is well')
71         self._test_subscribe(
72             poll_fallback=0.25, expect_type=arvados.events.PollClient, expected=1)
73
74     def test_subscribe_websocket_with_start_time_past(self):
75         self._test_subscribe(
76             poll_fallback=False, expect_type=arvados.events.EventClient,
77             start_time=self.localiso(self.TIME_PAST),
78             expected=2)
79
80     @mock.patch('arvados.events.EventClient.__init__')
81     def test_subscribe_poll_with_start_time_past(self, event_client_constr):
82         event_client_constr.side_effect = Exception('All is well')
83         self._test_subscribe(
84             poll_fallback=0.25, expect_type=arvados.events.PollClient,
85             start_time=self.localiso(self.TIME_PAST),
86             expected=2)
87
88     def test_subscribe_websocket_with_start_time_future(self):
89         self._test_subscribe(
90             poll_fallback=False, expect_type=arvados.events.EventClient,
91             start_time=self.localiso(self.TIME_FUTURE),
92             expected=0)
93
94     @mock.patch('arvados.events.EventClient.__init__')
95     def test_subscribe_poll_with_start_time_future(self, event_client_constr):
96         event_client_constr.side_effect = Exception('All is well')
97         self._test_subscribe(
98             poll_fallback=0.25, expect_type=arvados.events.PollClient,
99             start_time=self.localiso(self.TIME_FUTURE),
100             expected=0)
101
102     def test_subscribe_websocket_with_start_time_past_utc(self):
103         self._test_subscribe(
104             poll_fallback=False, expect_type=arvados.events.EventClient,
105             start_time=self.utciso(self.TIME_PAST),
106             expected=2)
107
108     def test_subscribe_websocket_with_start_time_future_utc(self):
109         self._test_subscribe(
110             poll_fallback=False, expect_type=arvados.events.EventClient,
111             start_time=self.utciso(self.TIME_FUTURE),
112             expected=0)
113
114     def utciso(self, t):
115         return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
116
117     def localiso(self, t):
118         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
119
120     def isotz(self, offset):
121         """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
122         return '{:+03d}{:02d}'.format(offset/60, offset%60)
123
124     # Test websocket reconnection on unexecpted close
125     def _test_websocket_reconnect(self, close_unexpected):
126         run_test_server.authorize_with('active')
127         events = Queue.Queue(100)
128
129         filters = [['object_uuid', 'is_a', 'arvados#human']]
130         filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
131         self.ws = arvados.events.subscribe(
132             arvados.api('v1'), filters,
133             events.put_nowait,
134             poll_fallback=False,
135             last_log_id=None)
136         self.assertIsInstance(self.ws, arvados.events.EventClient)
137         self.assertEqual(200, events.get(True, 5)['status'])
138
139         # create obj
140         human = arvados.api('v1').humans().create(body={}).execute()
141
142         # expect an event
143         self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
144         with self.assertRaises(Queue.Empty):
145             self.assertEqual(events.get(True, 2), None)
146
147         # close (im)properly
148         if close_unexpected:
149             self.ws.close_connection()
150         else:
151             self.ws.close()
152
153         # create one more obj
154         human2 = arvados.api('v1').humans().create(body={}).execute()
155
156         # (un)expect the object creation event
157         if close_unexpected:
158             log_object_uuids = []
159             for i in range(0, 2):
160                 event = events.get(True, 5)
161                 if event.get('object_uuid') != None:
162                     log_object_uuids.append(event['object_uuid'])
163             with self.assertRaises(Queue.Empty):
164                 self.assertEqual(events.get(True, 2), None)
165             self.assertNotIn(human['uuid'], log_object_uuids)
166             self.assertIn(human2['uuid'], log_object_uuids)
167         else:
168             with self.assertRaises(Queue.Empty):
169                 self.assertEqual(events.get(True, 2), None)
170
171     def test_websocket_reconnect_on_unexpected_close(self):
172         self._test_websocket_reconnect(True)
173
174     def test_websocket_no_reconnect_on_close_by_user(self):
175         self._test_websocket_reconnect(False)