module WebSocket
class Driver
- class Hybi < Driver
- alias_method :_frame, :frame
+ class Server
+ alias_method :_write, :write
- def frame(data, type = nil, code = nil)
+ def write(data)
# Most of the sending activity will be from the thread set up in
# on_connect. However, there is also some automatic activity in the
- # form of ping/pong messages, so ensure that the frame method (which
- # actually packs and sends one complete message to the underlying
- # socket) can only be called by one thread at a time.
- @socket.frame_mtx.synchronize do
- _frame(data, type, code)
+ # form of ping/pong messages, so ensure that the write method used to
+ # send one complete message to the underlying socket can only be
+ # called by one thread at a time.
+ self.frame_mtx.synchronize do
+ _write(data)
end
end
end
begin
# Must have at least one filter set up to receive events
if ws.filters.length > 0
- # Start with log rows readable by user, sorted in ascending order
- logs = Log.readable_by(ws.user).order("id asc")
+ # Start with log rows readable by user
+ logs = Log.readable_by(ws.user)
cond_id = nil
cond_out = []
logs = logs.where(cond_id, *param_out)
end
- # Execute query and actually send the matching log rows
- logs.each do |l|
+ # Execute query and actually send the matching log rows. Load
+ # the full log records only when we're ready to send them,
+ # though: otherwise, (1) postgres has to build the whole
+ # result set and return it to us before we can send the first
+ # event, and (2) we store lots of records in memory while
+ # waiting to spool them out to the client. Both of these are
+ # troublesome when log records are large (e.g., a collection
+ # update contains both old and new manifest_text).
+ #
+ # Note: find_each implies order('id asc'), which is what we
+ # want.
+ logs.select('logs.id').find_each do |l|
if not ws.sent_ids.include?(l.id)
# only send if not a duplicate
- ws.send(l.as_api_response.to_json)
+ ws.send(Log.find(l.id).as_api_response.to_json)
end
if not ws.last_log_id.nil?
# record ids only when sending "catchup" messages, not notifies
ws.sent_ids = Set.new
end
- if ws.filters.length < MAX_FILTERS
+ if ws.filters.length < Rails.configuration.websocket_max_filters
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
ws.filters << filter
# Send any pending events
push_events ws, nil
else
- ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+ ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
end
elsif p[:method] == 'unsubscribe'
end
end
- # Constant maximum number of filters, to avoid silly huge database queries.
- MAX_FILTERS = 16
- MAX_NOTIFY_BACKLOG = 1000
- MAX_CONNECTIONS = 500
-
def overloaded?
@mtx.synchronize do
- @connection_count >= MAX_CONNECTIONS
+ @connection_count >= Rails.configuration.websocket_max_connections
end
end
# Subscribe to internal postgres notifications through @channel and
# forward them to the thread associated with the connection.
sub = @channel.subscribe do |msg|
- if msg != :term
- ws.queue << [:notify, msg]
- else
+ if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
+ ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
ws.close
+ @channel.unsubscribe sub
+ ws.queue.clear
+ else
+ ws.queue << [:notify, msg]
end
end
# connections may not complete in a timely manner.
Thread.new do
# Loop and react to socket events.
- loop do
- eventType, msg = ws.queue.pop
- if ws.queue.length > MAX_NOTIFY_BACKLOG
- ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
- ws.close
- else
+ begin
+ loop do
+ eventType, msg = ws.queue.pop
if eventType == :message
handle_message ws, msg
elsif eventType == :notify
break
end
end
- end
- @mtx.synchronize do
- @connection_count -= 1
+ ensure
+ @mtx.synchronize do
+ @connection_count -= 1
+ end
end
end