7 class EventTestBase(object):
8 def on_event(self, ev):
10 self.assertEqual(200, ev['status'])
14 self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
22 self.subscribed = threading.Event()
23 self.done = threading.Event()
25 run_test_server.authorize_with("admin")
26 api = arvados.api('v1', cache=False)
27 self.ws = arvados.events.subscribe(arvados.api('v1', cache=False), [['object_uuid', 'is_a', 'arvados#human']], self.on_event, poll_fallback=2)
28 if not isinstance(self.ws, self.WS_TYPE):
30 self.subscribed.wait(10)
31 self.h = api.humans().create(body={}).execute()
33 self.assertEqual(3, self.state)
35 class WebsocketTest(run_test_server.TestCaseWithServers, EventTestBase):
36 MAIN_SERVER = {'websockets': True}
37 WS_TYPE = arvados.events.EventClient
41 super(run_test_server.TestCaseWithServers, self).tearDown()
44 class PollClientTest(run_test_server.TestCaseWithServers, EventTestBase):
46 WS_TYPE = arvados.events.PollClient
50 super(run_test_server.TestCaseWithServers, self).tearDown()