8 class EventTestBase(object):
10 run_test_server.authorize_with("admin")
11 events = Queue.Queue(3)
12 self.ws = arvados.events.subscribe(
13 arvados.api('v1'), [['object_uuid', 'is_a', 'arvados#human']],
14 events.put, poll_fallback=2)
15 self.assertIsInstance(self.ws, self.WS_TYPE)
16 self.assertEqual(200, events.get(True, 10)['status'])
17 human = arvados.api('v1').humans().create(body={}).execute()
18 self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid'])
19 self.assertTrue(events.empty(), "got more events than expected")
22 class WebsocketTest(run_test_server.TestCaseWithServers, EventTestBase):
23 MAIN_SERVER = {'websockets': True}
24 WS_TYPE = arvados.events.EventClient
29 super(WebsocketTest, self).tearDown()
32 class PollClientTest(run_test_server.TestCaseWithServers, EventTestBase):
34 WS_TYPE = arvados.events.PollClient
39 super(PollClientTest, self).tearDown()