X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/74e7f636d3ec2f3a20640b721a3fd76fd23cd788..dfcb25e5e9b8caf92a50c3e00026d3c0131bc2e8:/services/api/lib/eventbus.rb diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index 080138ba4c..35671d65b2 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -1,3 +1,7 @@ +# If any threads raise an unhandled exception, make them all die. +# We trust a supervisor like runit to restart the server in this case. +Thread.abort_on_exception = true + require 'eventmachine' require 'oj' require 'faye/websocket' @@ -51,7 +55,9 @@ class EventBus # Push out any pending events to the connection +ws+ # +notify_id+ the id of the most recent row in the log table, may be nil # - # This accepts a websocket and a notify_id (this is the row id from Postgres LISTEN/NOTIFY, it may nil) + # This accepts a websocket and a notify_id (this is the row id from Postgres + # LISTEN/NOTIFY, it may be nil if called from somewhere else) + # # It queries the database for log rows that are either # a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out # b) if ws.last_log_id is nil, then it queries rows starting with notify_id @@ -110,7 +116,7 @@ class EventBus # Execute query and actually send the matching log rows count = 0 - limit = 100 + limit = 20 logs.limit(limit).each do |l| ws.send(l.as_api_response.to_json) @@ -122,8 +128,10 @@ class EventBus # Number of rows returned was capped by limit(), we need to schedule # another query to get more logs (will start from last_log_id # reported by current query) - @channel.push nil - elsif !notify_id.nil? and notify_id > ws.last_log_id + EventMachine::schedule do + push_events ws, nil + end + elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id) # Number of rows returned was less than cap, but the notify id is # higher than the last id visible to the client, so update last_log_id ws.last_log_id = notify_id @@ -137,14 +145,24 @@ class EventBus Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}" ws.send ({status: 500, message: 'error'}.to_json) ws.close + # These exceptions typically indicate serious server trouble: + # out of memory issues, database connection problems, etc. Go ahead and + # crash; we expect that a supervisor service like runit will restart us. + raise end end # Handle inbound subscribe or unsubscribe message. def handle_message ws, event begin - # Parse event data as JSON - p = (Oj.load event.data).symbolize_keys + begin + # Parse event data as JSON + p = (Oj.load event.data).symbolize_keys + filter = Filter.new(p) + rescue Oj::Error => e + ws.send ({status: 400, message: "malformed request"}.to_json) + return + end if p[:method] == 'subscribe' # Handle subscribe event @@ -158,7 +176,7 @@ class EventBus if ws.filters.length < MAX_FILTERS # Add a filter. This gets the :filters field which is the same # format as used for regular index queries. - ws.filters << Filter.new(p) + ws.filters << filter ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json) # Send any pending events @@ -181,8 +199,6 @@ class EventBus else ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) end - rescue Oj::Error => e - ws.send ({status: 400, message: "malformed request"}.to_json) rescue => e Rails.logger.warn "Error handling message: #{$!}" Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"