From 27b18bf9b168319660bdde4632ac4c3f359666d6 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 20 Jun 2016 14:29:21 -0400 Subject: [PATCH] 9427: Add limits for connections, subscriptions, queued notifications, and database connections to configuration. Check notify queue size on push instead of on pop and close connection immediately. --- services/api/config/application.default.yml | 14 +++++ services/api/config/initializers/eventbus.rb | 13 +++++ services/api/lib/eventbus.rb | 52 +++++++++---------- .../api/test/integration/websocket_test.rb | 8 +-- 4 files changed, 55 insertions(+), 32 deletions(-) diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml index f1c4dd0286..0e005dcb17 100644 --- a/services/api/config/application.default.yml +++ b/services/api/config/application.default.yml @@ -70,6 +70,20 @@ common: # websockets, otherwise none at all. websocket_address: false + # Maximum number of websocket connections allowed + websocket_max_connections: 500 + + # Maximum number of events a single connection can be backlogged + websocket_max_notify_backlog: 1000 + + # Maximum number of subscriptions a single websocket connection can have + # active. + websocket_max_filters: 10 + + # When using multithreaded websocket server, set the size of the database + # connection pool. + websocket_db_pool: 50 + # Git repositories must be readable by api server, or you won't be # able to submit crunch jobs. To pass the test suites, put a clone # of the arvados tree in {git_repositories_dir}/arvados.git or diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb index ea1c210385..7c452c33dd 100644 --- a/services/api/config/initializers/eventbus.rb +++ b/services/api/config/initializers/eventbus.rb @@ -13,6 +13,19 @@ Server::Application.configure do :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only") } Rails.logger.info "Websockets #{ENV['ARVADOS_WEBSOCKETS']}, running at /websocket" + + Rails.application.config.after_initialize do + ActiveRecord::Base.connection_pool.disconnect! + + ActiveSupport.on_load(:active_record) do + config = ActiveRecord::Base.configurations[Rails.env] || + Rails.application.config.database_configuration[Rails.env] + config['pool'] = Rails.application.config.websocket_db_pool + ActiveRecord::Base.establish_connection(config) + Rails.logger.info "Database connection pool size #{Rails.application.config.websocket_db_pool}" + end + end + else Rails.logger.info "Websockets disabled" end diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index 0c5df827d0..aaeebdccf0 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -25,17 +25,17 @@ end module WebSocket class Driver - class Hybi < Driver - alias_method :_frame, :frame + class Server + alias_method :_write, :write - def frame(data, type = nil, code = nil) + def write(data) # Most of the sending activity will be from the thread set up in # on_connect. However, there is also some automatic activity in the - # form of ping/pong messages, so ensure that the frame method (which - # actually packs and sends one complete message to the underlying - # socket) can only be called by one thread at a time. - @socket.frame_mtx.synchronize do - _frame(data, type, code) + # form of ping/pong messages, so ensure that the write method used to + # send one complete message to the underlying socket can only be + # called by one thread at a time. + self.frame_mtx.synchronize do + _write(data) end end end @@ -186,7 +186,7 @@ class EventBus ws.sent_ids = Set.new end - if ws.filters.length < MAX_FILTERS + if ws.filters.length < Rails.configuration.websocket_max_filters # Add a filter. This gets the :filters field which is the same # format as used for regular index queries. ws.filters << filter @@ -195,7 +195,7 @@ class EventBus # Send any pending events push_events ws, nil else - ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json) + ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json) end elsif p[:method] == 'unsubscribe' @@ -220,14 +220,9 @@ class EventBus end end - # Constant maximum number of filters, to avoid silly huge database queries. - MAX_FILTERS = 16 - MAX_NOTIFY_BACKLOG = 1000 - MAX_CONNECTIONS = 500 - def overloaded? @mtx.synchronize do - @connection_count >= MAX_CONNECTIONS + @connection_count >= Rails.configuration.websocket_max_connections end end @@ -256,10 +251,13 @@ class EventBus # Subscribe to internal postgres notifications through @channel and # forward them to the thread associated with the connection. sub = @channel.subscribe do |msg| - if msg != :term - ws.queue << [:notify, msg] - else + if ws.queue.length > Rails.configuration.websocket_max_notify_backlog + ws.send ({status: 500, message: 'Notify backlog too long'}.to_json) ws.close + @channel.unsubscribe sub + ws.queue.clear + else + ws.queue << [:notify, msg] end end @@ -284,12 +282,9 @@ class EventBus # connections may not complete in a timely manner. Thread.new do # Loop and react to socket events. - loop do - eventType, msg = ws.queue.pop - if ws.queue.length > MAX_NOTIFY_BACKLOG - ws.send ({status: 500, message: 'Notify backlog too long'}.to_json) - ws.close - else + begin + loop do + eventType, msg = ws.queue.pop if eventType == :message handle_message ws, msg elsif eventType == :notify @@ -298,9 +293,10 @@ class EventBus break end end - end - @mtx.synchronize do - @connection_count -= 1 + ensure + @mtx.synchronize do + @connection_count -= 1 + end end end diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb index d1b8c34a43..25e7592c39 100644 --- a/services/api/test/integration/websocket_test.rb +++ b/services/api/test/integration/websocket_test.rb @@ -31,7 +31,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest ws.on :open do |event| opened = true if timeout - EM::Timer.new 4 do + EM::Timer.new 8 do too_long = true if close_status.nil? EM.stop_event_loop end @@ -597,10 +597,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest ws.on :message do |event| d = Oj.strict_load event.data case state - when (1..EventBus::MAX_FILTERS) + when (1..Rails.configuration.websocket_max_filters) assert_equal 200, d["status"] state += 1 - when (EventBus::MAX_FILTERS+1) + when (Rails.configuration.websocket_max_filters+1) assert_equal 403, d["status"] ws.close end @@ -608,7 +608,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest end - assert_equal 17, state + assert_equal Rails.configuration.websocket_max_filters+1, state end -- 2.39.5