Merge branch 'master' into 6465-optimize-workbench-integration-tests
[arvados.git] / sdk / python / tests / test_websockets.py
1 import Queue
2 import run_test_server
3 import unittest
4 import arvados
5 import arvados.events
6 import mock
7 import threading
8
9 class WebsocketTest(run_test_server.TestCaseWithServers):
10     MAIN_SERVER = {}
11
12     def setUp(self):
13         self.ws = None
14
15     def tearDown(self):
16         if self.ws:
17             self.ws.close()
18         super(WebsocketTest, self).tearDown()
19
20     def _test_subscribe(self, poll_fallback, expect_type):
21         run_test_server.authorize_with('active')
22         events = Queue.Queue(3)
23         self.ws = arvados.events.subscribe(
24             arvados.api('v1'), [['object_uuid', 'is_a', 'arvados#human']],
25             events.put, poll_fallback=poll_fallback)
26         self.assertIsInstance(self.ws, expect_type)
27         self.assertEqual(200, events.get(True, 10)['status'])
28         human = arvados.api('v1').humans().create(body={}).execute()
29         self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid'])
30         self.assertTrue(events.empty(), "got more events than expected")
31
32     def test_subscribe_websocket(self):
33         self._test_subscribe(
34             poll_fallback=False, expect_type=arvados.events.EventClient)
35
36     @mock.patch('arvados.events.EventClient.__init__')
37     def test_subscribe_poll(self, event_client_constr):
38         event_client_constr.side_effect = Exception('All is well')
39         self._test_subscribe(
40             poll_fallback=1, expect_type=arvados.events.PollClient)