require 'record_filters'
require 'load_param'
+# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
module Faye
class WebSocket
attr_accessor :user
end
end
+# Store the filters supplied by the user that will be applied to the logs table
+# to determine which events to return to the listener.
class Filter
include LoadParam
end
end
+# Manages websocket connections, accepts subscription messages and publishes
+# log table events.
class EventBus
include CurrentApiClient
include RecordFilters
Log
end
+ # Initialize EventBus. Takes no parameters.
def initialize
@channel = EventMachine::Channel.new
@mtx = Mutex.new
@filter_id_counter = 0
end
+ # Allocate a new filter id
def alloc_filter_id
- (@filter_id_counter += 1)
+ @filter_id_counter += 1
end
- def push_events ws, msg = nil
+ # Push out any pending events to the connection +ws+
+ # +id+ the id of the most recent row in the log table, may be nil
+ def push_events ws, id = nil
begin
# Must have at least one filter set up to receive events
if ws.filters.length > 0
if ws.last_log_id
# Only interested in log rows that are new
logs = logs.where("logs.id > ?", ws.last_log_id)
- elsif msg
+ elsif id
# No last log id, so only look at the most recently changed row
- logs = logs.where("logs.id = ?", msg.to_i)
+ logs = logs.where("logs.id = ?", id.to_i)
else
return
end
ws.send(l.as_api_response.to_json)
ws.last_log_id = l.id
end
- elsif msg
+ elsif id
# No filters set up, so just record the sequence number
- ws.last_log_id = msg.to_i
+ ws.last_log_id = id.to_i
end
rescue Exception => e
puts "Error publishing event: #{$!}"
end
end
+ # Constant maximum number of filters, to avoid silly huge database queries.
MAX_FILTERS = 16
+ # Called by RackSocket when a new websocket connection has been established.
def on_connect ws
+
+ # Disconnect if no valid API token.
+ # current_user is included from CurrentApiClient
if not current_user
ws.send ({status: 401, message: "Valid API token required"}.to_json)
ws.close
return
end
+ # Initialize our custom fields on the websocket connection object.
ws.user = current_user
ws.filters = []
ws.last_log_id = nil
+ # Subscribe to internal postgres notifications through @channel. This will
+ # call push_events when a notification comes through.
sub = @channel.subscribe do |msg|
push_events ws, msg
end
+ # Set up callback for inbound message dispatch.
ws.on :message do |event|
begin
p = (Oj.load event.data).symbolize_keys