module WebSocket
class Driver
- class Hybi < Driver
- alias_method :_frame, :frame
+ class Server
+ alias_method :_write, :write
- def frame(data, type = nil, code = nil)
+ def write(data)
# Most of the sending activity will be from the thread set up in
# on_connect. However, there is also some automatic activity in the
- # form of ping/pong messages, so ensure that the frame method (which
- # actually packs and sends one complete message to the underlying
- # socket) can only be called by one thread at a time.
- @socket.frame_mtx.synchronize do
- _frame(data, type, code)
+ # form of ping/pong messages, so ensure that the write method used to
+ # send one complete message to the underlying socket can only be
+ # called by one thread at a time.
+ self.frame_mtx.synchronize do
+ _write(data)
end
end
end
ws.sent_ids = Set.new
end
- if ws.filters.length < MAX_FILTERS
+ if ws.filters.length < Rails.configuration.websocket_max_filters
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
ws.filters << filter
# Send any pending events
push_events ws, nil
else
- ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+ ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
end
elsif p[:method] == 'unsubscribe'
end
end
- # Constant maximum number of filters, to avoid silly huge database queries.
- MAX_FILTERS = 16
- MAX_NOTIFY_BACKLOG = 1000
- MAX_CONNECTIONS = 500
-
def overloaded?
@mtx.synchronize do
- @connection_count >= MAX_CONNECTIONS
+ @connection_count >= Rails.configuration.websocket_max_connections
end
end
# Subscribe to internal postgres notifications through @channel and
# forward them to the thread associated with the connection.
sub = @channel.subscribe do |msg|
- if msg != :term
- ws.queue << [:notify, msg]
- else
+ if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
+ ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
ws.close
+ @channel.unsubscribe sub
+ ws.queue.clear
+ else
+ ws.queue << [:notify, msg]
end
end
# connections may not complete in a timely manner.
Thread.new do
# Loop and react to socket events.
- loop do
- eventType, msg = ws.queue.pop
- if ws.queue.length > MAX_NOTIFY_BACKLOG
- ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
- ws.close
- else
+ begin
+ loop do
+ eventType, msg = ws.queue.pop
if eventType == :message
handle_message ws, msg
elsif eventType == :notify
break
end
end
- end
- @mtx.synchronize do
- @connection_count -= 1
+ ensure
+ @mtx.synchronize do
+ @connection_count -= 1
+ end
end
end