Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / tests / test_websockets.py
1 import Queue
2 import run_test_server
3 import unittest
4 import arvados
5 import arvados.events
6 import threading
7
8 class EventTestBase(object):
9     def runTest(self):
10         run_test_server.authorize_with("admin")
11         events = Queue.Queue(3)
12         self.ws = arvados.events.subscribe(
13             arvados.api('v1'), [['object_uuid', 'is_a', 'arvados#human']],
14             events.put, poll_fallback=2)
15         self.assertIsInstance(self.ws, self.WS_TYPE)
16         self.assertEqual(200, events.get(True, 10)['status'])
17         human = arvados.api('v1').humans().create(body={}).execute()
18         self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid'])
19         self.assertTrue(events.empty(), "got more events than expected")
20
21
22 class WebsocketTest(run_test_server.TestCaseWithServers, EventTestBase):
23     MAIN_SERVER = {'websockets': True}
24     WS_TYPE = arvados.events.EventClient
25
26     def tearDown(self):
27         if self.ws:
28             self.ws.close()
29         super(WebsocketTest, self).tearDown()
30
31
32 class PollClientTest(run_test_server.TestCaseWithServers, EventTestBase):
33     MAIN_SERVER = {}
34     WS_TYPE = arvados.events.PollClient
35
36     def tearDown(self):
37         if self.ws:
38             self.ws.close()
39         super(PollClientTest, self).tearDown()