+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
standard_library.install_aliases()
from builtins import range
from builtins import object
-import arvados
-import io
import logging
import mock
import queue
-from . import run_test_server
+import sys
import threading
import time
import unittest
-from . import arvados_testutil
+import arvados
+from . import arvados_testutil as tutil
+from . import run_test_server
+
class WebsocketTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
TIME_PAST = time.time()-3600
TIME_FUTURE = time.time()+3600
- MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
+ MOCK_WS_URL = 'wss://[{}]/'.format(tutil.TEST_HOST)
TEST_TIMEOUT = 10.0
if self.ws:
self.ws.close()
except Exception as e:
- print(("Error in teardown: ", e))
+ print("Error in teardown: ", e)
super(WebsocketTest, self).tearDown()
run_test_server.reset()
last_log_id=(1 if start_time else None))
self.assertIsInstance(self.ws, expect_type)
self.assertEqual(200, events.get(True, 5)['status'])
+
+ if hasattr(self.ws, '_skip_old_events'):
+ # Avoid race by waiting for the first "find ID threshold"
+ # poll to finish.
+ deadline = time.time() + 10
+ while not self.ws._skip_old_events:
+ self.assertLess(time.time(), deadline)
+ time.sleep(0.1)
human = arvados.api('v1').humans().create(body={}).execute()
want_uuids = []
error_mock = mock.MagicMock()
error_mock.resp.status = 0
error_mock._get_reason.return_value = "testing"
- api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
- {"items": [{"id": 1}], "items_available": 1},
- arvados.errors.ApiError(error_mock, ""),
- {"items": [{"id": 1}], "items_available": 1})
+ api_mock.logs().list().execute.side_effect = (
+ arvados.errors.ApiError(error_mock, b""),
+ {"items": [{"id": 1}], "items_available": 1},
+ arvados.errors.ApiError(error_mock, b""),
+ {"items": [{"id": 1}], "items_available": 1},
+ )
pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
pc.start()
while len(n) < 2:
"""Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
return '{:+03d}:{:02d}'.format(offset//60, offset%60)
- # Test websocket reconnection on (un)execpted close
+ # Test websocket reconnection on (un)expected close
def _test_websocket_reconnect(self, close_unexpected):
run_test_server.authorize_with('active')
events = queue.Queue(100)
- logstream = io.BytesIO()
+ logstream = tutil.StringIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)
def test_websocket_reconnect_retry(self, event_client_connect):
event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
- logstream = io.BytesIO()
+ logstream = tutil.StringIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)