From 1c6515a4445e9fd79961c9c8caf21528f16d9399 Mon Sep 17 00:00:00 2001 From: radhika Date: Sat, 18 Jul 2015 09:21:05 -0400 Subject: [PATCH] 6473: include last_log_id when start_time argument is provided. --- sdk/python/arvados/commands/ws.py | 7 +++-- sdk/python/arvados/events.py | 17 ++++++----- sdk/python/tests/test_websockets.py | 47 +++++++++++++++++++++-------- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py index 26d528a909..be48dc0cb9 100644 --- a/sdk/python/arvados/commands/ws.py +++ b/sdk/python/arvados/commands/ws.py @@ -14,7 +14,7 @@ def main(arguments=None): parser = argparse.ArgumentParser() parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid") parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)") - parser.add_argument('-s', '--start_time', type=str, default="", help="Arvados query filter to apply to log events created after this time. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss") + parser.add_argument('-s', '--start_time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss") group = parser.add_mutually_exclusive_group() group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds") @@ -62,7 +62,8 @@ def main(arguments=None): filters += [ ['object_uuid', '=', args.pipeline] ] if args.start_time: - filters += [ ['created_at', '>', args.start_time] ] + args.last_log_id = 1 + filters += [ ['created_at', '>=', args.start_time] ] def on_message(ev): global filters @@ -89,7 +90,7 @@ def main(arguments=None): print json.dumps(ev) try: - ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval) + ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval, last_log_id=args.last_log_id) if ws: if args.pipeline: c = api.pipeline_instances().get(uuid=args.pipeline).execute() diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py index 3036a25fe0..b1872002eb 100644 --- a/sdk/python/arvados/events.py +++ b/sdk/python/arvados/events.py @@ -14,7 +14,7 @@ from ws4py.client.threadedclient import WebSocketClient _logger = logging.getLogger('arvados.events') class EventClient(WebSocketClient): - def __init__(self, url, filters, on_event): + def __init__(self, url, filters, on_event, last_log_id): ssl_options = {'ca_certs': arvados.util.ca_certs_path()} if config.flag_is_true('ARVADOS_API_HOST_INSECURE'): ssl_options['cert_reqs'] = ssl.CERT_NONE @@ -28,9 +28,10 @@ class EventClient(WebSocketClient): super(EventClient, self).__init__(url, ssl_options=ssl_options) self.filters = filters self.on_event = on_event + self.last_log_id = last_log_id def opened(self): - self.subscribe(self.filters) + self.subscribe(self.filters, self.last_log_id) def received_message(self, m): self.on_event(json.loads(str(m))) @@ -109,13 +110,13 @@ class PollClient(threading.Thread): del self.filters[self.filters.index(filters)] -def _subscribe_websocket(api, filters, on_event): +def _subscribe_websocket(api, filters, on_event, last_log_id=None): endpoint = api._rootDesc.get('websocketUrl', None) if not endpoint: raise errors.FeatureNotEnabledError( "Server does not advertise a websocket endpoint") uri_with_token = "{}?api_token={}".format(endpoint, api.api_token) - client = EventClient(uri_with_token, filters, on_event) + client = EventClient(uri_with_token, filters, on_event, last_log_id) ok = False try: client.connect() @@ -125,7 +126,7 @@ def _subscribe_websocket(api, filters, on_event): if not ok: client.close_connection() -def subscribe(api, filters, on_event, poll_fallback=15): +def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None): """ :api: a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). @@ -135,13 +136,15 @@ def subscribe(api, filters, on_event, poll_fallback=15): The callback when a message is received. :poll_fallback: If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. + :last_log_id: + Log rows that are newer than the log id """ if not poll_fallback: - return _subscribe_websocket(api, filters, on_event) + return _subscribe_websocket(api, filters, on_event, last_log_id) try: - return _subscribe_websocket(api, filters, on_event) + return _subscribe_websocket(api, filters, on_event, last_log_id) except Exception as e: _logger.warn("Falling back to polling after websocket error: %s" % e) p = PollClient(api, filters, on_event, poll_fallback) diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py index e82d569bc0..c62397f639 100644 --- a/sdk/python/tests/test_websockets.py +++ b/sdk/python/tests/test_websockets.py @@ -6,6 +6,7 @@ import arvados.events import mock import threading from datetime import datetime, timedelta +import time class WebsocketTest(run_test_server.TestCaseWithServers): MAIN_SERVER = {} @@ -18,20 +19,40 @@ class WebsocketTest(run_test_server.TestCaseWithServers): self.ws.close() super(WebsocketTest, self).tearDown() - def _test_subscribe(self, poll_fallback, expect_type, additional_filters=None): + def _test_subscribe(self, poll_fallback, expect_type, last_log_id=None, additional_filters=None, expected=1): run_test_server.authorize_with('active') - events = Queue.Queue(3) + events = Queue.Queue(100) filters = [['object_uuid', 'is_a', 'arvados#human']] if additional_filters: filters = filters + additional_filters + + # Create an extra object before subscribing and verify that as well + ancestor = arvados.api('v1').humans().create(body={}).execute() + time.sleep(5) + self.ws = arvados.events.subscribe( arvados.api('v1'), filters, - events.put, poll_fallback=poll_fallback) + events.put, poll_fallback=poll_fallback, last_log_id=last_log_id) self.assertIsInstance(self.ws, expect_type) self.assertEqual(200, events.get(True, 10)['status']) human = arvados.api('v1').humans().create(body={}).execute() - self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid']) - self.assertTrue(events.empty(), "got more events than expected") + + if last_log_id == None or expected == 0: + self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid']) + self.assertTrue(events.empty(), "got more events than expected") + else: + log_events = [] + for i in range(0, 10): + try: + event = events.get(True, 10) + self.assertTrue(event['object_uuid'] is not None) + log_events.append(event['object_uuid']) + except: + break; + + self.assertTrue(len(log_events)>1) + self.assertTrue(human['uuid'] in log_events) + self.assertTrue(ancestor['uuid'] in log_events) def test_subscribe_websocket(self): self._test_subscribe( @@ -40,28 +61,28 @@ class WebsocketTest(run_test_server.TestCaseWithServers): def test_subscribe_websocket_with_start_time_today(self): now = datetime.today() self._test_subscribe( - poll_fallback=False, expect_type=arvados.events.EventClient, - additional_filters=[['created_at', '>', now.strftime('%Y-%m-%d')]]) + poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1, + additional_filters=[['created_at', '>=', now.strftime('%Y-%m-%d')]]) def test_subscribe_websocket_with_start_time_last_hour(self): lastHour = datetime.today() - timedelta(hours = 1) self._test_subscribe( - poll_fallback=False, expect_type=arvados.events.EventClient, - additional_filters=[['created_at', '>', lastHour.strftime('%Y-%m-%d %H:%M:%S')]]) + poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1, + additional_filters=[['created_at', '>=', lastHour.strftime('%Y-%m-%d %H:%M:%S')]]) def test_subscribe_websocket_with_start_time_next_hour(self): nextHour = datetime.today() + timedelta(hours = 1) with self.assertRaises(Queue.Empty): self._test_subscribe( - poll_fallback=False, expect_type=arvados.events.EventClient, - additional_filters=[['created_at', '>', nextHour.strftime('%Y-%m-%d %H:%M:%S')]]) + poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1, + additional_filters=[['created_at', '>=', nextHour.strftime('%Y-%m-%d %H:%M:%S')]], expected=0) def test_subscribe_websocket_with_start_time_tomorrow(self): tomorrow = datetime.today() + timedelta(hours = 24) with self.assertRaises(Queue.Empty): self._test_subscribe( - poll_fallback=False, expect_type=arvados.events.EventClient, - additional_filters=[['created_at', '>', tomorrow.strftime('%Y-%m-%d')]]) + poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1, + additional_filters=[['created_at', '>=', tomorrow.strftime('%Y-%m-%d')]], expected=0) @mock.patch('arvados.events.EventClient.__init__') def test_subscribe_poll(self, event_client_constr): -- 2.39.5