Prior to this commit, websockets used to try to send log events out in batches,
by getting all logs with an id greater than last log that was sent.
Unfortunately, under concurrent database writes, logs from uncommited
transactions may not appear in the query even if logs with larger ids do
appear. This results in the uncommitted log never being sent out because
subsequent batch sends would not consider logs prior to the last log id that
was sent (which, in this case, is higher than the log that was missed.)
This commit eliminates the batching behavior. Because NOTIFY includes the log
id of a specific record that was committed, consider only the log record with
that id and process events in the order that the NOTIFY events arrive. This
means events may be delivered out of numeric order (although they now more
closely reflect the "actual" order, e.g. the order that the events were
actually committed to the database).
"Catch ups" where the client has specified a last_log_id and needs to have past
logs replayed continue to be sent in batches.
parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+ parser.add_argument('-i', '--id', type=int, default="", help="Start from given log id.")
group = parser.add_mutually_exclusive_group()
group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
group = parser.add_mutually_exclusive_group()
group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
+ if args.id:
+ last_log_id = args.id-1
+
def on_message(ev):
global filters
global ws
def on_message(ev):
global filters
global ws
# push_events call if there are more log rows to send.
def push_events ws, notify_id
begin
# push_events call if there are more log rows to send.
def push_events ws, notify_id
begin
- if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id
- # This notify is for a row we've handled already.
- return
- end
-
# Must have at least one filter set up to receive events
if ws.filters.length > 0
# Start with log rows readable by user, sorted in ascending order
# Must have at least one filter set up to receive events
if ws.filters.length > 0
# Start with log rows readable by user, sorted in ascending order
param_out = []
if !ws.last_log_id.nil?
param_out = []
if !ws.last_log_id.nil?
- # Client is only interested in log rows that are newer than the
- # last log row seen by the client.
+ # We are catching up from some starting point.
cond_id = "logs.id > ?"
param_out << ws.last_log_id
elsif !notify_id.nil?
cond_id = "logs.id > ?"
param_out << ws.last_log_id
elsif !notify_id.nil?
- # No last log id, so look at rows starting with notify id
- cond_id = "logs.id >= ?"
+ # Get new row being notified.
+ cond_id = "logs.id = ?"
param_out << notify_id
else
# No log id to start from, nothing to do, return
param_out << notify_id
else
# No log id to start from, nothing to do, return
logs.limit(limit).each do |l|
ws.send(l.as_api_response.to_json)
logs.limit(limit).each do |l|
ws.send(l.as_api_response.to_json)
# Number of rows returned was capped by limit(), we need to schedule
# another query to get more logs (will start from last_log_id
# reported by current query)
# Number of rows returned was capped by limit(), we need to schedule
# another query to get more logs (will start from last_log_id
# reported by current query)
+ ws.last_log_id = lastid
EventMachine::next_tick do
EventMachine::next_tick do
+ push_events ws, notify_id
- elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
- # Number of rows returned was less than cap, but the notify id is
- # higher than the last id visible to the client, so update last_log_id
- ws.last_log_id = notify_id
+ elsif !ws.last_log_id.nil?
+ # Done catching up
+ ws.last_log_id = nil
end
elsif !notify_id.nil?
# No filters set up, so just record the sequence number
end
elsif !notify_id.nil?
# No filters set up, so just record the sequence number