self.fail()
def runTest(self):
+ self.ws = None
self.state = 1
self.subscribed = threading.Event()
self.done = threading.Event()
run_test_server.authorize_with("admin")
api = arvados.api('v1', cache=False)
self.ws = arvados.events.subscribe(arvados.api('v1', cache=False), [['object_uuid', 'is_a', 'arvados#human']], self.on_event, poll_fallback=2)
- if not isinstance(self.ws, self.WS_TYPE):
- self.fail()
+ self.assertIsInstance(self.ws, self.WS_TYPE)
self.subscribed.wait(10)
self.h = api.humans().create(body={}).execute()
self.done.wait(10)
WS_TYPE = arvados.events.EventClient
def tearDown(self):
- self.ws.close()
- super(run_test_server.TestCaseWithServers, self).tearDown()
+ if self.ws:
+ self.ws.close()
+ super(WebsocketTest, self).tearDown()
class PollClientTest(run_test_server.TestCaseWithServers, EventTestBase):
WS_TYPE = arvados.events.PollClient
def tearDown(self):
- self.ws.close()
- super(run_test_server.TestCaseWithServers, self).tearDown()
+ if self.ws:
+ self.ws.close()
+ super(PollClientTest, self).tearDown()