@connection_count = 0
end
+ def send_message(ws, obj)
+ ws.send(Oj.dump(obj, mode: :compat))
+ end
+
# Push out any pending events to the connection +ws+
# +notify_id+ the id of the most recent row in the log table, may be nil
#
#
# Note: find_each implies order('id asc'), which is what we
# want.
- logs.select(:id).find_each do |l|
+ logs.select('logs.id').find_each do |l|
if not ws.sent_ids.include?(l.id)
# only send if not a duplicate
- ws.send(Log.find(l.id).as_api_response.to_json)
+ send_message(ws, Log.find(l.id).as_api_response)
end
if not ws.last_log_id.nil?
# record ids only when sending "catchup" messages, not notifies
rescue ArgumentError => e
# There was some kind of user error.
Rails.logger.warn "Error publishing event: #{$!}"
- ws.send ({status: 500, message: $!}.to_json)
+ send_message(ws, {status: 500, message: $!})
ws.close
rescue => e
Rails.logger.warn "Error publishing event: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- ws.send ({status: 500, message: $!}.to_json)
+ send_message(ws, {status: 500, message: $!})
ws.close
# These exceptions typically indicate serious server trouble:
# out of memory issues, database connection problems, etc. Go ahead and
p = (Oj.strict_load event.data).symbolize_keys
filter = Filter.new(p)
rescue Oj::Error => e
- ws.send ({status: 400, message: "malformed request"}.to_json)
+ send_message(ws, {status: 400, message: "malformed request"})
return
end
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
ws.filters << filter
- ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
+ send_message(ws, {status: 200, message: 'subscribe ok', filter: p})
# Send any pending events
push_events ws, nil
else
- ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
+ send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"})
end
elsif p[:method] == 'unsubscribe'
len = ws.filters.length
ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
if ws.filters.length < len
- ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+ send_message(ws, {status: 200, message: 'unsubscribe ok'})
else
- ws.send ({status: 404, message: 'filter not found'}.to_json)
+ send_message(ws, {status: 404, message: 'filter not found'})
end
else
- ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+ send_message(ws, {status: 400, message: "missing or unrecognized method"})
end
rescue => e
Rails.logger.warn "Error handling message: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- ws.send ({status: 500, message: 'error'}.to_json)
+ send_message(ws, {status: 500, message: 'error'})
ws.close
end
end
# Disconnect if no valid API token.
# current_user is included from CurrentApiClient
if not current_user
- ws.send ({status: 401, message: "Valid API token required"}.to_json)
- ws.close
+ send_message(ws, {status: 401, message: "Valid API token required"})
+ # Wait for the handshake to complete before closing the
+ # socket. Otherwise, nginx responds with HTTP 502 Bad gateway,
+ # and the client never sees our real error message.
+ ws.on :open do |event|
+ ws.close
+ end
return
end
# forward them to the thread associated with the connection.
sub = @channel.subscribe do |msg|
if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
- ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
+ send_message(ws, {status: 500, message: 'Notify backlog too long'})
ws.close
@channel.unsubscribe sub
ws.queue.clear
@mtx.synchronize do
@connection_count -= 1
end
+ ActiveRecord::Base.connection.close
end
end