module WebSocket
class Driver
- class Hybi < Driver
- alias_method :_frame, :frame
+ class Server
+ alias_method :_write, :write
- def frame(data, type = nil, code = nil)
+ def write(data)
# Most of the sending activity will be from the thread set up in
# on_connect. However, there is also some automatic activity in the
- # form of ping/pong messages, so ensure that the frame method (which
- # actually packs and sends one complete message to the underlying
- # socket) can only be called by one thread at a time.
- @socket.frame_mtx.synchronize do
- _frame(data, type, code)
+ # form of ping/pong messages, so ensure that the write method used to
+ # send one complete message to the underlying socket can only be
+ # called by one thread at a time.
+ self.frame_mtx.synchronize do
+ _write(data)
end
end
end
@connection_count = 0
end
+ def send_message(ws, obj)
+ ws.send(Oj.dump(obj, mode: :compat))
+ end
+
# Push out any pending events to the connection +ws+
# +notify_id+ the id of the most recent row in the log table, may be nil
#
begin
# Must have at least one filter set up to receive events
if ws.filters.length > 0
- # Start with log rows readable by user, sorted in ascending order
- logs = Log.readable_by(ws.user).order("id asc")
+ # Start with log rows readable by user
+ logs = Log.readable_by(ws.user)
cond_id = nil
cond_out = []
logs = logs.where(cond_id, *param_out)
end
- # Execute query and actually send the matching log rows
- logs.each do |l|
+ # Execute query and actually send the matching log rows. Load
+ # the full log records only when we're ready to send them,
+ # though: otherwise, (1) postgres has to build the whole
+ # result set and return it to us before we can send the first
+ # event, and (2) we store lots of records in memory while
+ # waiting to spool them out to the client. Both of these are
+ # troublesome when log records are large (e.g., a collection
+ # update contains both old and new manifest_text).
+ #
+ # Note: find_each implies order('id asc'), which is what we
+ # want.
+ logs.select('logs.id').find_each do |l|
if not ws.sent_ids.include?(l.id)
# only send if not a duplicate
- ws.send(l.as_api_response.to_json)
+ send_message(ws, Log.find(l.id).as_api_response)
end
if not ws.last_log_id.nil?
# record ids only when sending "catchup" messages, not notifies
rescue ArgumentError => e
# There was some kind of user error.
Rails.logger.warn "Error publishing event: #{$!}"
- ws.send ({status: 500, message: $!}.to_json)
+ send_message(ws, {status: 500, message: $!})
ws.close
rescue => e
Rails.logger.warn "Error publishing event: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- ws.send ({status: 500, message: $!}.to_json)
+ send_message(ws, {status: 500, message: $!})
ws.close
# These exceptions typically indicate serious server trouble:
# out of memory issues, database connection problems, etc. Go ahead and
p = (Oj.strict_load event.data).symbolize_keys
filter = Filter.new(p)
rescue Oj::Error => e
- ws.send ({status: 400, message: "malformed request"}.to_json)
+ send_message(ws, {status: 400, message: "malformed request"})
return
end
ws.sent_ids = Set.new
end
- if ws.filters.length < MAX_FILTERS
+ if ws.filters.length < Rails.configuration.websocket_max_filters
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
ws.filters << filter
- ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
+ send_message(ws, {status: 200, message: 'subscribe ok', filter: p})
# Send any pending events
push_events ws, nil
else
- ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+ send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"})
end
elsif p[:method] == 'unsubscribe'
len = ws.filters.length
ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
if ws.filters.length < len
- ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+ send_message(ws, {status: 200, message: 'unsubscribe ok'})
else
- ws.send ({status: 404, message: 'filter not found'}.to_json)
+ send_message(ws, {status: 404, message: 'filter not found'})
end
else
- ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+ send_message(ws, {status: 400, message: "missing or unrecognized method"})
end
rescue => e
Rails.logger.warn "Error handling message: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- ws.send ({status: 500, message: 'error'}.to_json)
+ send_message(ws, {status: 500, message: 'error'})
ws.close
end
end
- # Constant maximum number of filters, to avoid silly huge database queries.
- MAX_FILTERS = 16
- MAX_NOTIFY_BACKLOG = 1000
- MAX_CONNECTIONS = 500
-
def overloaded?
@mtx.synchronize do
- @connection_count >= MAX_CONNECTIONS
+ @connection_count >= Rails.configuration.websocket_max_connections
end
end
# Disconnect if no valid API token.
# current_user is included from CurrentApiClient
if not current_user
- ws.send ({status: 401, message: "Valid API token required"}.to_json)
- ws.close
+ send_message(ws, {status: 401, message: "Valid API token required"})
+ # Wait for the handshake to complete before closing the
+ # socket. Otherwise, nginx responds with HTTP 502 Bad gateway,
+ # and the client never sees our real error message.
+ ws.on :open do |event|
+ ws.close
+ end
return
end
# Subscribe to internal postgres notifications through @channel and
# forward them to the thread associated with the connection.
sub = @channel.subscribe do |msg|
- if msg != :term
- ws.queue << [:notify, msg]
- else
+ if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
+ send_message(ws, {status: 500, message: 'Notify backlog too long'})
ws.close
+ @channel.unsubscribe sub
+ ws.queue.clear
+ else
+ ws.queue << [:notify, msg]
end
end
# connections may not complete in a timely manner.
Thread.new do
# Loop and react to socket events.
- loop do
- eventType, msg = ws.queue.pop
- if ws.queue.length > MAX_NOTIFY_BACKLOG
- ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
- ws.close
- else
+ begin
+ loop do
+ eventType, msg = ws.queue.pop
if eventType == :message
handle_message ws, msg
elsif eventType == :notify
break
end
end
- end
- @mtx.synchronize do
- @connection_count -= 1
+ ensure
+ @mtx.synchronize do
+ @connection_count -= 1
+ end
+ ActiveRecord::Base.connection.close
end
end