X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2c16f4fbf9408bb758a0f54cae4058dccef4c2a5..56b3e82324aea74cee083d3831c89eb7b553fb9c:/services/api/lib/eventbus.rb diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index 1754fc0ae9..35671d65b2 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -1,3 +1,7 @@ +# If any threads raise an unhandled exception, make them all die. +# We trust a supervisor like runit to restart the server in this case. +Thread.abort_on_exception = true + require 'eventmachine' require 'oj' require 'faye/websocket' @@ -112,7 +116,7 @@ class EventBus # Execute query and actually send the matching log rows count = 0 - limit = 100 + limit = 20 logs.limit(limit).each do |l| ws.send(l.as_api_response.to_json) @@ -141,14 +145,24 @@ class EventBus Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" ws.send ({status: 500, message: 'error'}.to_json) ws.close + # These exceptions typically indicate serious server trouble: + # out of memory issues, database connection problems, etc. Go ahead and + # crash; we expect that a supervisor service like runit will restart us. + raise end end # Handle inbound subscribe or unsubscribe message. def handle_message ws, event begin - # Parse event data as JSON - p = (Oj.load event.data).symbolize_keys + begin + # Parse event data as JSON + p = (Oj.load event.data).symbolize_keys + filter = Filter.new(p) + rescue Oj::Error => e + ws.send ({status: 400, message: "malformed request"}.to_json) + return + end if p[:method] == 'subscribe' # Handle subscribe event @@ -162,7 +176,7 @@ class EventBus if ws.filters.length < MAX_FILTERS # Add a filter. This gets the :filters field which is the same # format as used for regular index queries. - ws.filters << Filter.new(p) + ws.filters << filter ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json) # Send any pending events @@ -185,8 +199,6 @@ class EventBus else ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) end - rescue Oj::Error => e - ws.send ({status: 400, message: "malformed request"}.to_json) rescue => e Rails.logger.warn "Error handling message: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" @@ -252,9 +264,6 @@ class EventBus @channel.push payload.to_i end end - rescue NoMemoryError - EventMachine::stop_event_loop - abort "Out of memory" ensure # Don't want the connection to still be listening once we return # it to the pool - could result in weird behavior for the next