5011: Dry up three different solutions for mocking keep_services().accessible().
[arvados.git] / sdk / python / tests / test_websockets.py
1 import Queue
2 import run_test_server
3 import unittest
4 import arvados
5 import arvados.events
6 import threading
7
8 class EventTestBase(object):
9     def runTest(self):
10         run_test_server.authorize_with("admin")
11         events = Queue.Queue(3)
12         self.ws = arvados.events.subscribe(
13             arvados.api('v1'), [['object_uuid', 'is_a', 'arvados#human']],
14             events.put, poll_fallback=2)
15         self.assertIsInstance(self.ws, self.WS_TYPE)
16         self.assertEqual(200, events.get(True, 10)['status'])
17         human = arvados.api('v1').humans().create(body={}).execute()
18         self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid'])
19         self.assertTrue(events.empty(), "got more events than expected")
20
21     def tearDown(self):
22         try:
23             self.ws.close()
24         except AttributeError:
25             pass
26         super(EventTestBase, self).tearDown()
27
28
29 class WebsocketTest(EventTestBase, run_test_server.TestCaseWithServers):
30     MAIN_SERVER = {'websockets': True}
31     WS_TYPE = arvados.events.EventClient
32
33
34 class PollClientTest(EventTestBase, run_test_server.TestCaseWithServers):
35     MAIN_SERVER = {}
36     WS_TYPE = arvados.events.PollClient