parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+ parser.add_argument('-i', '--id', type=int, default="", help="Start from given log id.")
group = parser.add_mutually_exclusive_group()
group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
else:
last_log_id = None
+ if args.id:
+ last_log_id = args.id-1
+
def on_message(ev):
global filters
global ws
require 'faye/websocket'
require 'record_filters'
require 'load_param'
+require 'set'
# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
module Faye
attr_accessor :user
attr_accessor :last_log_id
attr_accessor :filters
+ attr_accessor :sent_ids
+ attr_accessor :notify_queue
end
end
# push_events call if there are more log rows to send.
def push_events ws, notify_id
begin
- if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id
- # This notify is for a row we've handled already.
- return
- end
-
# Must have at least one filter set up to receive events
if ws.filters.length > 0
# Start with log rows readable by user, sorted in ascending order
cond_out = []
param_out = []
- if !ws.last_log_id.nil?
- # Client is only interested in log rows that are newer than the
- # last log row seen by the client.
+ if not notify_id.nil?
+ ws.notify_queue.unshift notify_id
+ end
+
+ if not ws.last_log_id.nil?
+ # We are catching up from some starting point.
cond_id = "logs.id > ?"
param_out << ws.last_log_id
- elsif !notify_id.nil?
- # No last log id, so look at rows starting with notify id
- cond_id = "logs.id >= ?"
- param_out << notify_id
+ elsif ws.notify_queue.length > 0
+ # Get next row being notified.
+ cond_id = "logs.id = ?"
+ param_out << ws.notify_queue.pop
else
# No log id to start from, nothing to do, return
return
count = 0
limit = 10
+ lastid = nil
logs.limit(limit).each do |l|
- ws.send(l.as_api_response.to_json)
- ws.last_log_id = l.id
+ if not ws.sent_ids.include?(l.id)
+ # only send if not a duplicate
+ ws.send(l.as_api_response.to_json)
+ end
+ if not ws.last_log_id.nil?
+ # record ids only when sending "catchup" messages, not notifies
+ ws.sent_ids << l.id
+ end
+ lastid = l.id
count += 1
end
# Number of rows returned was capped by limit(), we need to schedule
# another query to get more logs (will start from last_log_id
# reported by current query)
+ ws.last_log_id = lastid
+ EventMachine::next_tick do
+ push_events ws, nil
+ end
+ elsif !ws.last_log_id.nil?
+ # Done catching up
+ ws.last_log_id = nil
+ end
+
+ if ws.notify_queue.length > 0
EventMachine::next_tick do
push_events ws, nil
end
- elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
- # Number of rows returned was less than cap, but the notify id is
- # higher than the last id visible to the client, so update last_log_id
- ws.last_log_id = notify_id
end
- elsif !notify_id.nil?
- # No filters set up, so just record the sequence number
- ws.last_log_id = notify_id
end
rescue ArgumentError => e
# There was some kind of user error.
# Set or reset the last_log_id. The event bus only reports events
# for rows that come after last_log_id.
ws.last_log_id = p[:last_log_id].to_i
+ # Reset sent_ids for consistency
+ # (always re-deliver all matching messages following last_log_id)
+ ws.sent_ids = Set.new
end
if ws.filters.length < MAX_FILTERS
ws.user = current_user
ws.filters = []
ws.last_log_id = nil
+ ws.sent_ids = Set.new
+ ws.notify_queue = Array.new
# Subscribe to internal postgres notifications through @channel. This will
# call push_events when a notification comes through.