3 require 'faye/websocket'
4 require 'record_filters'
10 attr_accessor :last_log_id
11 attr_accessor :filters
33 include CurrentApiClient
37 @channel = EventMachine::Channel.new
44 ws.send ({status: 401, message: "Valid API token required"}.to_json)
49 ws.user = current_user
53 sub = @channel.subscribe do |msg|
54 # Must have at least one filter set up to receive events
55 if ws.filters.length > 0
57 # Start with log rows readable by user, sorted in ascending order
58 logs = Log.readable_by(ws.user).order("id asc")
61 # Only get log rows that are new
62 logs = logs.where("log.id > ? and log.id <= ?", ws.last_log_id, msg.to_i)
64 # No last log id, so only look at the most recently changed row
65 logs = logs.where("log.id = ?", msg.to_i)
68 # Record the most recent row
69 ws.last_log_id = msg.to_i
71 # Now process filters provided by client
74 ws.filters.each do |filter|
75 ft = record_filters filter.filters
76 cond_out += ft[:cond_out]
77 param_out += ft[:param_out]
80 # Add filters to query
82 logs = logs.where(cond_out.join(' OR '), *param_out)
85 # Finally execute query and send matching rows
87 ws.send(l.as_api_response.to_json)
90 # No filters set up, so just record the sequence number
91 ws.last_log_id.nil = msg.to_i
95 ws.on :message do |event|
96 p = Oj.load event.data
97 if p["method"] == 'subscribe'
98 if p["starting_log_id"]
99 ws.last_log_id = p["starting_log_id"].to_i
101 ws.filters.push(Filter.new p)
102 ws.send ({status: 200, message: 'subscribe ok'}.to_json)
106 ws.on :close do |event|
107 @channel.unsubscribe sub
115 # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
116 ActiveRecord::Base.connection_pool.with_connection do |connection|
117 conn = connection.instance_variable_get(:@connection)
119 conn.async_exec "LISTEN logs"
121 conn.wait_for_notify do |channel, pid, payload|
122 @channel.push payload
126 # Don't want the connection to still be listening once we return
127 # it to the pool - could result in weird behavior for the next
128 # thread to check it out.
129 conn.async_exec "UNLISTEN *"