projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '8497-datamanager-batchsize-1000' of https://github.com/wtsi-hgi/arvados...
[arvados.git]
/
services
/
api
/
lib
/
eventbus.rb
diff --git
a/services/api/lib/eventbus.rb
b/services/api/lib/eventbus.rb
index 848279f9fda9f3126460c00a8b23239875643272..ac53876122d6b2e74b0d9fed85a56308308465b4 100644
(file)
--- a/
services/api/lib/eventbus.rb
+++ b/
services/api/lib/eventbus.rb
@@
-1,3
+1,7
@@
+# If any threads raise an unhandled exception, make them all die.
+# We trust a supervisor like runit to restart the server in this case.
+Thread.abort_on_exception = true
+
require 'eventmachine'
require 'oj'
require 'faye/websocket'
require 'eventmachine'
require 'oj'
require 'faye/websocket'
@@
-51,7
+55,9
@@
class EventBus
# Push out any pending events to the connection +ws+
# +notify_id+ the id of the most recent row in the log table, may be nil
#
# Push out any pending events to the connection +ws+
# +notify_id+ the id of the most recent row in the log table, may be nil
#
- # This accepts a websocket and a notify_id (this is the row id from Postgres LISTEN/NOTIFY, it may nil)
+ # This accepts a websocket and a notify_id (this is the row id from Postgres
+ # LISTEN/NOTIFY, it may be nil if called from somewhere else)
+ #
# It queries the database for log rows that are either
# a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
# b) if ws.last_log_id is nil, then it queries rows starting with notify_id
# It queries the database for log rows that are either
# a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
# b) if ws.last_log_id is nil, then it queries rows starting with notify_id
@@
-110,7
+116,7
@@
class EventBus
# Execute query and actually send the matching log rows
count = 0
# Execute query and actually send the matching log rows
count = 0
- limit = 10
0
+ limit = 10
logs.limit(limit).each do |l|
ws.send(l.as_api_response.to_json)
logs.limit(limit).each do |l|
ws.send(l.as_api_response.to_json)
@@
-122,7
+128,9
@@
class EventBus
# Number of rows returned was capped by limit(), we need to schedule
# another query to get more logs (will start from last_log_id
# reported by current query)
# Number of rows returned was capped by limit(), we need to schedule
# another query to get more logs (will start from last_log_id
# reported by current query)
- @channel.push nil
+ EventMachine::next_tick do
+ push_events ws, nil
+ end
elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
# Number of rows returned was less than cap, but the notify id is
# higher than the last id visible to the client, so update last_log_id
elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
# Number of rows returned was less than cap, but the notify id is
# higher than the last id visible to the client, so update last_log_id
@@
-132,19
+140,34
@@
class EventBus
# No filters set up, so just record the sequence number
ws.last_log_id = notify_id
end
# No filters set up, so just record the sequence number
ws.last_log_id = notify_id
end
+ rescue ArgumentError => e
+ # There was some kind of user error.
+ Rails.logger.warn "Error publishing event: #{$!}"
+ ws.send ({status: 500, message: $!}.to_json)
+ ws.close
rescue => e
Rails.logger.warn "Error publishing event: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
rescue => e
Rails.logger.warn "Error publishing event: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- ws.send ({status: 500, message:
'error'
}.to_json)
+ ws.send ({status: 500, message:
$!
}.to_json)
ws.close
ws.close
+ # These exceptions typically indicate serious server trouble:
+ # out of memory issues, database connection problems, etc. Go ahead and
+ # crash; we expect that a supervisor service like runit will restart us.
+ raise
end
end
# Handle inbound subscribe or unsubscribe message.
def handle_message ws, event
begin
end
end
# Handle inbound subscribe or unsubscribe message.
def handle_message ws, event
begin
- # Parse event data as JSON
- p = (Oj.load event.data).symbolize_keys
+ begin
+ # Parse event data as JSON
+ p = (Oj.load event.data).symbolize_keys
+ filter = Filter.new(p)
+ rescue Oj::Error => e
+ ws.send ({status: 400, message: "malformed request"}.to_json)
+ return
+ end
if p[:method] == 'subscribe'
# Handle subscribe event
if p[:method] == 'subscribe'
# Handle subscribe event
@@
-158,7
+181,7
@@
class EventBus
if ws.filters.length < MAX_FILTERS
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
if ws.filters.length < MAX_FILTERS
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
- ws.filters <<
Filter.new(p)
+ ws.filters <<
filter
ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
# Send any pending events
ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
# Send any pending events
@@
-181,8
+204,6
@@
class EventBus
else
ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
end
else
ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
end
- rescue Oj::Error => e
- ws.send ({status: 400, message: "malformed request"}.to_json)
rescue => e
Rails.logger.warn "Error handling message: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
rescue => e
Rails.logger.warn "Error handling message: #{$!}"
Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
@@
-248,9
+269,6
@@
class EventBus
@channel.push payload.to_i
end
end
@channel.push payload.to_i
end
end
- rescue NoMemoryError
- EventMachine::stop_event_loop
- abort "Out of memory"
ensure
# Don't want the connection to still be listening once we return
# it to the pool - could result in weird behavior for the next
ensure
# Don't want the connection to still be listening once we return
# it to the pool - could result in weird behavior for the next