X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/92b83dafab5a3fe68b266e7fdc02f30f1a262b47..acefccc4d506ed4bd3f51d3d88bc3a826b28be76:/services/api/lib/eventbus.rb diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index e7f2bb1310..11b178d985 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -3,10 +3,11 @@ Thread.abort_on_exception = true require 'eventmachine' -require 'oj' require 'faye/websocket' -require 'record_filters' require 'load_param' +require 'oj' +require 'record_filters' +require 'safe_json' require 'set' require 'thread' @@ -78,6 +79,10 @@ class EventBus @connection_count = 0 end + def send_message(ws, obj) + ws.send(SafeJSON.dump(obj)) + end + # Push out any pending events to the connection +ws+ # +notify_id+ the id of the most recent row in the log table, may be nil # @@ -143,10 +148,10 @@ class EventBus # # Note: find_each implies order('id asc'), which is what we # want. - logs.select(:id).find_each do |l| + logs.select('logs.id').find_each do |l| if not ws.sent_ids.include?(l.id) # only send if not a duplicate - ws.send(Log.find(l.id).as_api_response.to_json) + send_message(ws, Log.find(l.id).as_api_response) end if not ws.last_log_id.nil? # record ids only when sending "catchup" messages, not notifies @@ -158,12 +163,12 @@ class EventBus rescue ArgumentError => e # There was some kind of user error. Rails.logger.warn "Error publishing event: #{$!}" - ws.send ({status: 500, message: $!}.to_json) + send_message(ws, {status: 500, message: $!}) ws.close rescue => e Rails.logger.warn "Error publishing event: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" - ws.send ({status: 500, message: $!}.to_json) + send_message(ws, {status: 500, message: $!}) ws.close # These exceptions typically indicate serious server trouble: # out of memory issues, database connection problems, etc. Go ahead and @@ -177,10 +182,10 @@ class EventBus begin begin # Parse event data as JSON - p = (Oj.strict_load event.data).symbolize_keys + p = SafeJSON.load(event.data).symbolize_keys filter = Filter.new(p) rescue Oj::Error => e - ws.send ({status: 400, message: "malformed request"}.to_json) + send_message(ws, {status: 400, message: "malformed request"}) return end @@ -200,12 +205,12 @@ class EventBus # Add a filter. This gets the :filters field which is the same # format as used for regular index queries. ws.filters << filter - ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json) + send_message(ws, {status: 200, message: 'subscribe ok', filter: p}) # Send any pending events push_events ws, nil else - ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json) + send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}) end elsif p[:method] == 'unsubscribe' @@ -214,18 +219,18 @@ class EventBus len = ws.filters.length ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) } if ws.filters.length < len - ws.send ({status: 200, message: 'unsubscribe ok'}.to_json) + send_message(ws, {status: 200, message: 'unsubscribe ok'}) else - ws.send ({status: 404, message: 'filter not found'}.to_json) + send_message(ws, {status: 404, message: 'filter not found'}) end else - ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) + send_message(ws, {status: 400, message: "missing or unrecognized method"}) end rescue => e Rails.logger.warn "Error handling message: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" - ws.send ({status: 500, message: 'error'}.to_json) + send_message(ws, {status: 500, message: 'error'}) ws.close end end @@ -241,8 +246,13 @@ class EventBus # Disconnect if no valid API token. # current_user is included from CurrentApiClient if not current_user - ws.send ({status: 401, message: "Valid API token required"}.to_json) - ws.close + send_message(ws, {status: 401, message: "Valid API token required"}) + # Wait for the handshake to complete before closing the + # socket. Otherwise, nginx responds with HTTP 502 Bad gateway, + # and the client never sees our real error message. + ws.on :open do |event| + ws.close + end return end @@ -262,7 +272,7 @@ class EventBus # forward them to the thread associated with the connection. sub = @channel.subscribe do |msg| if ws.queue.length > Rails.configuration.websocket_max_notify_backlog - ws.send ({status: 500, message: 'Notify backlog too long'}.to_json) + send_message(ws, {status: 500, message: 'Notify backlog too long'}) ws.close @channel.unsubscribe sub ws.queue.clear @@ -307,6 +317,7 @@ class EventBus @mtx.synchronize do @connection_count -= 1 end + ActiveRecord::Base.connection.close end end