attr_accessor :last_log_id
attr_accessor :filters
attr_accessor :sent_ids
+ attr_accessor :notify_queue
end
end
cond_out = []
param_out = []
- if !ws.last_log_id.nil?
+ if not notify_id.nil?
+ ws.notify_queue.unshift notify_id
+ end
+
+ if not ws.last_log_id.nil?
# We are catching up from some starting point.
cond_id = "logs.id > ?"
param_out << ws.last_log_id
- elsif !notify_id.nil?
- # Get new row being notified.
+ elsif ws.notify_queue.length > 0
+ # Get next row being notified.
cond_id = "logs.id = ?"
- param_out << notify_id
+ param_out << ws.notify_queue.pop
else
# No log id to start from, nothing to do, return
return
ws.send(l.as_api_response.to_json)
end
if not ws.last_log_id.nil?
- # only record ids from "catchup" messages and not notifies
+ # record ids only when sending "catchup" messages, not notifies
ws.sent_ids << l.id
end
lastid = l.id
# reported by current query)
ws.last_log_id = lastid
EventMachine::next_tick do
- push_events ws, notify_id
+ push_events ws, nil
end
elsif !ws.last_log_id.nil?
# Done catching up
ws.last_log_id = nil
end
+
+ if ws.notify_queue.length > 0
+ EventMachine::next_tick do
+ push_events ws, nil
+ end
+ end
end
rescue ArgumentError => e
# There was some kind of user error.
ws.filters = []
ws.last_log_id = nil
ws.sent_ids = Set.new
+ ws.notify_queue = Array.new
# Subscribe to internal postgres notifications through @channel. This will
# call push_events when a notification comes through.