3 require 'faye/websocket'
4 require 'record_filters'
10 attr_accessor :last_log_id
11 attr_accessor :filters
33 include CurrentApiClient
37 @channel = EventMachine::Channel.new
44 ws.send '{"error":"Not logged in"}'
49 ws.user = current_user
53 sub = @channel.subscribe do |msg|
54 # Must have at least one filter set up to receive events
55 if ws.filters.length > 0
57 # Start with log rows readable by user, sorted in ascending order
58 logs = Log.readable_by(ws.user).order("id asc")
61 # Only get log rows that are new
62 logs = logs.where("log.id > ? and log.id <= ?", ws.last_log_id, msg.to_i)
64 # No last log id, so only look at the most recently changed row
65 logs = logs.where("log.id = ?", msg.to_i)
68 # Record the most recent row
69 ws.last_log_id = msg.to_i
71 # Now process filters provided by client
74 ws.filters.each do |filter|
75 ft = record_filters filter.filters
76 cond_out += ft[:cond_out]
77 param_out += ft[:param_out]
80 # Add filters to query
82 logs = logs.where(cond_out.join(' OR '), *param_out)
85 # Finally execute query and send matching rows
87 ws.send(l.as_api_response.to_json)
90 # No filters set up, so just record the sequence number
91 ws.last_log_id.nil = msg.to_i
95 ws.on :message do |event|
96 p = oj.parse(event.data)
97 if p[:method] == 'subscribe'
98 if p[:starting_log_id]
99 ws.last_log_id = p[:starting_log_id].to_i
101 ws.filters.push(Filter.new p)
105 ws.on :close do |event|
106 @channel.unsubscribe sub
114 # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
115 ActiveRecord::Base.connection_pool.with_connection do |connection|
116 conn = connection.instance_variable_get(:@connection)
118 conn.async_exec "LISTEN logs"
120 conn.wait_for_notify do |channel, pid, payload|
121 @channel.push payload
125 # Don't want the connection to still be listening once we return
126 # it to the pool - could result in weird behavior for the next
127 # thread to check it out.
128 conn.async_exec "UNLISTEN *"