X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/31e1d10eb4a54b13f55b3c9638f46032be633ff9..88c382d13b3d6e6f3b03ba0d5139ad9552c3c359:/services/api/lib/eventbus.rb diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index 1754fc0ae9..ac53876122 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -1,3 +1,7 @@ +# If any threads raise an unhandled exception, make them all die. +# We trust a supervisor like runit to restart the server in this case. +Thread.abort_on_exception = true + require 'eventmachine' require 'oj' require 'faye/websocket' @@ -112,7 +116,7 @@ class EventBus # Execute query and actually send the matching log rows count = 0 - limit = 100 + limit = 10 logs.limit(limit).each do |l| ws.send(l.as_api_response.to_json) @@ -124,7 +128,7 @@ class EventBus # Number of rows returned was capped by limit(), we need to schedule # another query to get more logs (will start from last_log_id # reported by current query) - EventMachine::schedule do + EventMachine::next_tick do push_events ws, nil end elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id) @@ -136,19 +140,34 @@ class EventBus # No filters set up, so just record the sequence number ws.last_log_id = notify_id end + rescue ArgumentError => e + # There was some kind of user error. + Rails.logger.warn "Error publishing event: #{$!}" + ws.send ({status: 500, message: $!}.to_json) + ws.close rescue => e Rails.logger.warn "Error publishing event: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" - ws.send ({status: 500, message: 'error'}.to_json) + ws.send ({status: 500, message: $!}.to_json) ws.close + # These exceptions typically indicate serious server trouble: + # out of memory issues, database connection problems, etc. Go ahead and + # crash; we expect that a supervisor service like runit will restart us. + raise end end # Handle inbound subscribe or unsubscribe message. def handle_message ws, event begin - # Parse event data as JSON - p = (Oj.load event.data).symbolize_keys + begin + # Parse event data as JSON + p = (Oj.load event.data).symbolize_keys + filter = Filter.new(p) + rescue Oj::Error => e + ws.send ({status: 400, message: "malformed request"}.to_json) + return + end if p[:method] == 'subscribe' # Handle subscribe event @@ -162,7 +181,7 @@ class EventBus if ws.filters.length < MAX_FILTERS # Add a filter. This gets the :filters field which is the same # format as used for regular index queries. - ws.filters << Filter.new(p) + ws.filters << filter ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json) # Send any pending events @@ -185,8 +204,6 @@ class EventBus else ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) end - rescue Oj::Error => e - ws.send ({status: 400, message: "malformed request"}.to_json) rescue => e Rails.logger.warn "Error handling message: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" @@ -252,9 +269,6 @@ class EventBus @channel.push payload.to_i end end - rescue NoMemoryError - EventMachine::stop_event_loop - abort "Out of memory" ensure # Don't want the connection to still be listening once we return # it to the pool - could result in weird behavior for the next