From b0e9a800435e6231317e5123c9b6a1b5b6397a5e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 9 Jun 2016 17:55:37 -0400 Subject: [PATCH] 9388: Process each notify individually instead attempting to batch them up. Prior to this commit, websockets used to try to send log events out in batches, by getting all logs with an id greater than last log that was sent. Unfortunately, under concurrent database writes, logs from uncommited transactions may not appear in the query even if logs with larger ids do appear. This results in the uncommitted log never being sent out because subsequent batch sends would not consider logs prior to the last log id that was sent (which, in this case, is higher than the log that was missed.) This commit eliminates the batching behavior. Because NOTIFY includes the log id of a specific record that was committed, consider only the log record with that id and process events in the order that the NOTIFY events arrive. This means events may be delivered out of numeric order (although they now more closely reflect the "actual" order, e.g. the order that the events were actually committed to the database). "Catch ups" where the client has specified a last_log_id and needs to have past logs replayed continue to be sent in batches. --- sdk/python/arvados/commands/ws.py | 4 ++++ services/api/lib/eventbus.rb | 25 ++++++++++--------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py index 347075dffd..988ec961da 100644 --- a/sdk/python/arvados/commands/ws.py +++ b/sdk/python/arvados/commands/ws.py @@ -15,6 +15,7 @@ def main(arguments=None): parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid") parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)") parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss") + parser.add_argument('-i', '--id', type=int, default="", help="Start from given log id.") group = parser.add_mutually_exclusive_group() group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds") @@ -67,6 +68,9 @@ def main(arguments=None): else: last_log_id = None + if args.id: + last_log_id = args.id-1 + def on_message(ev): global filters global ws diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index 9bf95f5735..4988d59f2e 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -67,11 +67,6 @@ class EventBus # push_events call if there are more log rows to send. def push_events ws, notify_id begin - if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id - # This notify is for a row we've handled already. - return - end - # Must have at least one filter set up to receive events if ws.filters.length > 0 # Start with log rows readable by user, sorted in ascending order @@ -82,13 +77,12 @@ class EventBus param_out = [] if !ws.last_log_id.nil? - # Client is only interested in log rows that are newer than the - # last log row seen by the client. + # We are catching up from some starting point. cond_id = "logs.id > ?" param_out << ws.last_log_id elsif !notify_id.nil? - # No last log id, so look at rows starting with notify id - cond_id = "logs.id >= ?" + # Get new row being notified. + cond_id = "logs.id = ?" param_out << notify_id else # No log id to start from, nothing to do, return @@ -118,9 +112,10 @@ class EventBus count = 0 limit = 10 + lastid = nil logs.limit(limit).each do |l| ws.send(l.as_api_response.to_json) - ws.last_log_id = l.id + lastid = l.id count += 1 end @@ -128,13 +123,13 @@ class EventBus # Number of rows returned was capped by limit(), we need to schedule # another query to get more logs (will start from last_log_id # reported by current query) + ws.last_log_id = lastid EventMachine::next_tick do - push_events ws, nil + push_events ws, notify_id end - elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id) - # Number of rows returned was less than cap, but the notify id is - # higher than the last id visible to the client, so update last_log_id - ws.last_log_id = notify_id + elsif !ws.last_log_id.nil? + # Done catching up + ws.last_log_id = nil end elsif !notify_id.nil? # No filters set up, so just record the sequence number -- 2.30.2