- run_test_server.authorize_with("admin")
- api = arvados.api('v1', cache=False)
- ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev), poll_fallback=2)
- time.sleep(1)
- self.h = api.humans().create(body={}).execute()
- time.sleep(5)
- self.assertEqual(3, self.state)
- ws.close()
+class PollClientTest(run_test_server.TestCaseWithServers, EventTestBase):
+ MAIN_SERVER = {}
+ WS_TYPE = arvados.events.PollClient
+
+ def tearDown(self):
+ self.ws.close()
+ super(run_test_server.TestCaseWithServers, self).tearDown()