X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/21ccdae70baaca0c7aeca8542f2a0a431f06c313..017d19d31606b8b313c04fffc33d44592ad9644b:/services/api/lib/eventbus.rb diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index a0c0b272e7..ac53876122 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -1,3 +1,7 @@ +# If any threads raise an unhandled exception, make them all die. +# We trust a supervisor like runit to restart the server in this case. +Thread.abort_on_exception = true + require 'eventmachine' require 'oj' require 'faye/websocket' @@ -51,7 +55,9 @@ class EventBus # Push out any pending events to the connection +ws+ # +notify_id+ the id of the most recent row in the log table, may be nil # - # This accepts a websocket and a notify_id (this is the row id from Postgres LISTEN/NOTIFY, it may nil) + # This accepts a websocket and a notify_id (this is the row id from Postgres + # LISTEN/NOTIFY, it may be nil if called from somewhere else) + # # It queries the database for log rows that are either # a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out # b) if ws.last_log_id is nil, then it queries rows starting with notify_id @@ -110,7 +116,7 @@ class EventBus # Execute query and actually send the matching log rows count = 0 - limit = 100 + limit = 10 logs.limit(limit).each do |l| ws.send(l.as_api_response.to_json) @@ -122,7 +128,9 @@ class EventBus # Number of rows returned was capped by limit(), we need to schedule # another query to get more logs (will start from last_log_id # reported by current query) - @channel.push nil + EventMachine::next_tick do + push_events ws, nil + end elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id) # Number of rows returned was less than cap, but the notify id is # higher than the last id visible to the client, so update last_log_id @@ -132,19 +140,34 @@ class EventBus # No filters set up, so just record the sequence number ws.last_log_id = notify_id end + rescue ArgumentError => e + # There was some kind of user error. + Rails.logger.warn "Error publishing event: #{$!}" + ws.send ({status: 500, message: $!}.to_json) + ws.close rescue => e Rails.logger.warn "Error publishing event: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" - ws.send ({status: 500, message: 'error'}.to_json) + ws.send ({status: 500, message: $!}.to_json) ws.close + # These exceptions typically indicate serious server trouble: + # out of memory issues, database connection problems, etc. Go ahead and + # crash; we expect that a supervisor service like runit will restart us. + raise end end # Handle inbound subscribe or unsubscribe message. def handle_message ws, event begin - # Parse event data as JSON - p = (Oj.load event.data).symbolize_keys + begin + # Parse event data as JSON + p = (Oj.load event.data).symbolize_keys + filter = Filter.new(p) + rescue Oj::Error => e + ws.send ({status: 400, message: "malformed request"}.to_json) + return + end if p[:method] == 'subscribe' # Handle subscribe event @@ -158,7 +181,7 @@ class EventBus if ws.filters.length < MAX_FILTERS # Add a filter. This gets the :filters field which is the same # format as used for regular index queries. - ws.filters << Filter.new(p) + ws.filters << filter ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json) # Send any pending events @@ -181,8 +204,6 @@ class EventBus else ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) end - rescue Oj::Error => e - ws.send ({status: 400, message: "malformed request"}.to_json) rescue => e Rails.logger.warn "Error handling message: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"