+# If any threads raise an unhandled exception, make them all die.
+# We trust a supervisor like runit to restart the server in this case.
+Thread.abort_on_exception = true
+
require 'eventmachine'
require 'oj'
require 'faye/websocket'
# Execute query and actually send the matching log rows
count = 0
- limit = 100
+ limit = 20
logs.limit(limit).each do |l|
ws.send(l.as_api_response.to_json)
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
ws.send ({status: 500, message: 'error'}.to_json)
ws.close
+ # These exceptions typically indicate serious server trouble:
+ # out of memory issues, database connection problems, etc. Go ahead and
+ # crash; we expect that a supervisor service like runit will restart us.
+ raise
end
end
# Handle inbound subscribe or unsubscribe message.
def handle_message ws, event
begin
- # Parse event data as JSON
- p = (Oj.load event.data).symbolize_keys
+ begin
+ # Parse event data as JSON
+ p = (Oj.load event.data).symbolize_keys
+ filter = Filter.new(p)
+ rescue Oj::Error => e
+ ws.send ({status: 400, message: "malformed request"}.to_json)
+ return
+ end
if p[:method] == 'subscribe'
# Handle subscribe event
if ws.filters.length < MAX_FILTERS
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
- ws.filters << Filter.new(p)
+ ws.filters << filter
ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
# Send any pending events
else
ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
end
- rescue Oj::Error => e
- ws.send ({status: 400, message: "malformed request"}.to_json)
rescue => e
Rails.logger.warn "Error handling message: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
@channel.push payload.to_i
end
end
- rescue NoMemoryError
- EventMachine::stop_event_loop
- abort "Out of memory"
ensure
# Don't want the connection to still be listening once we return
# it to the pool - could result in weird behavior for the next