-import arvados
-import io
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import print_function
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
import logging
import mock
-import Queue
-import run_test_server
+import queue
+import sys
import threading
import time
import unittest
-import arvados_testutil
+import arvados
+from . import arvados_testutil as tutil
+from . import run_test_server
+
class WebsocketTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
TIME_PAST = time.time()-3600
TIME_FUTURE = time.time()+3600
- MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
+ MOCK_WS_URL = 'wss://[{}]/'.format(tutil.TEST_HOST)
TEST_TIMEOUT = 10.0
def _test_subscribe(self, poll_fallback, expect_type, start_time=None, expected=1):
run_test_server.authorize_with('active')
- events = Queue.Queue(100)
+ events = queue.Queue(100)
# Create ancestor before subscribing.
# When listening with start_time in the past, this should also be retrieved.
last_log_id=(1 if start_time else None))
self.assertIsInstance(self.ws, expect_type)
self.assertEqual(200, events.get(True, 5)['status'])
+
+ if hasattr(self.ws, '_skip_old_events'):
+ # Avoid race by waiting for the first "find ID threshold"
+ # poll to finish.
+ deadline = time.time() + 10
+ while not self.ws._skip_old_events:
+ self.assertLess(time.time(), deadline)
+ time.sleep(0.1)
human = arvados.api('v1').humans().create(body={}).execute()
want_uuids = []
log_object_uuids.append(events.get(True, 5)['object_uuid'])
if expected < 2:
- with self.assertRaises(Queue.Empty):
+ with self.assertRaises(queue.Empty):
# assertEqual just serves to show us what unexpected
# thing comes out of the queue when the assertRaises
# fails; when the test passes, this assertEqual
error_mock = mock.MagicMock()
error_mock.resp.status = 0
error_mock._get_reason.return_value = "testing"
- api_mock.logs().list().execute.side_effect = (arvados.errors.ApiError(error_mock, ""),
- {"items": [{"id": 1}], "items_available": 1},
- arvados.errors.ApiError(error_mock, ""),
- {"items": [{"id": 1}], "items_available": 1})
+ api_mock.logs().list().execute.side_effect = (
+ arvados.errors.ApiError(error_mock, b""),
+ {"items": [{"id": 1}], "items_available": 1},
+ arvados.errors.ApiError(error_mock, b""),
+ {"items": [{"id": 1}], "items_available": 1},
+ )
pc = arvados.events.PollClient(api_mock, [], on_ev, 15, None)
pc.start()
while len(n) < 2:
return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(t))
def localiso(self, t):
- return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
+ return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone//60)
def isotz(self, offset):
"""Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
- return '{:+03d}:{:02d}'.format(offset/60, offset%60)
+ return '{:+03d}:{:02d}'.format(offset//60, offset%60)
- # Test websocket reconnection on (un)execpted close
+ # Test websocket reconnection on (un)expected close
def _test_websocket_reconnect(self, close_unexpected):
run_test_server.authorize_with('active')
- events = Queue.Queue(100)
+ events = queue.Queue(100)
- logstream = io.BytesIO()
+ logstream = tutil.StringIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)
# expect an event
self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
- with self.assertRaises(Queue.Empty):
+ with self.assertRaises(queue.Empty):
self.assertEqual(events.get(True, 2), None)
# close (im)properly
event = events.get(True, 5)
if event.get('object_uuid') != None:
log_object_uuids.append(event['object_uuid'])
- with self.assertRaises(Queue.Empty):
+ with self.assertRaises(queue.Empty):
self.assertEqual(events.get(True, 2), None)
self.assertNotIn(human['uuid'], log_object_uuids)
self.assertIn(human2['uuid'], log_object_uuids)
else:
- with self.assertRaises(Queue.Empty):
+ with self.assertRaises(queue.Empty):
self.assertEqual(events.get(True, 2), None)
# verify log message to ensure that an (un)expected close
def test_websocket_reconnect_retry(self, event_client_connect):
event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
- logstream = io.BytesIO()
+ logstream = tutil.StringIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)
run_test_server.authorize_with('active')
- events = Queue.Queue(100)
+ events = queue.Queue(100)
filters = [['object_uuid', 'is_a', 'arvados#human']]
self.ws = arvados.events.subscribe(