def on_closed(self):
if self.is_closed == False:
+ _logger.warn("Unexpected close. Reconnecting.")
self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
while True:
try:
self.ec.subscribe(self.subscriptions[s], self.last_log_id)
break
except:
- _logger.warn("Failed to reconnect to websockets on %s. Will retry after 5s." % endpoint)
+ _logger.warn("Error during websocket reconnect. Will retry after 5s.")
time.sleep(5)
import arvados
import arvados.events
from datetime import datetime, timedelta, tzinfo
+import logging
import mock
+import os
import Queue
import run_test_server
+import tempfile
import threading
import time
import unittest
self.ws = None
def tearDown(self):
- if self.ws:
- self.ws.close()
+ try:
+ if self.ws:
+ self.ws.close()
+ except Exception as e:
+ print("Error in teardown: ", e)
super(WebsocketTest, self).tearDown()
run_test_server.reset()
"""Convert minutes-east-of-UTC to ISO8601 time zone designator"""
return '{:+03d}{:02d}'.format(offset/60, offset%60)
- # Test websocket reconnection on unexecpted close
+ # Test websocket reconnection on (un)execpted close
def _test_websocket_reconnect(self, close_unexpected):
run_test_server.authorize_with('active')
events = Queue.Queue(100)
def test_websocket_no_reconnect_on_close_by_user(self):
self._test_websocket_reconnect(False)
+
+ # Test websocket reconnection retry
+ @mock.patch('arvados.events._EventClient.connect')
+ def test_websocket_reconnect_retry(self, event_client_connect):
+ event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
+
+ log_file = tempfile.NamedTemporaryFile(suffix='log.out', delete=True)
+ logging.basicConfig(filename=log_file.name, level=logging.DEBUG,)
+
+ run_test_server.authorize_with('active')
+ events = Queue.Queue(100)
+
+ filters = [['object_uuid', 'is_a', 'arvados#human']]
+ self.ws = arvados.events.subscribe(
+ arvados.api('v1'), filters,
+ events.put_nowait,
+ poll_fallback=False,
+ last_log_id=None)
+ self.assertIsInstance(self.ws, arvados.events.EventClient)
+
+ # simulate improper close
+ self.ws.on_closed()
+
+ # verify log messages to ensure retry happened
+ log_messages = log_file.read()
+ found = log_messages.find("Error during websocket reconnect. Will retry")
+ self.assertNotEqual(found, -1)
+ os.unlink(log_file.name)