From a4724fe92e651abb06acf8c5e75184561a55c854 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 30 Apr 2014 10:56:04 -0400 Subject: [PATCH] Unsubscribe message now takes filter definition instead of filter_id, reducing the state both the client and server have to maintain in order to support unsubscribing. Added code comments and updated tests. --- services/api/lib/eventbus.rb | 64 +++++++++++-------- .../api/test/integration/websocket_test.rb | 49 +++++++------- services/api/test/websocket_runner.rb | 2 +- 3 files changed, 65 insertions(+), 50 deletions(-) diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index 8480cf47aa..79315aaf6f 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -20,19 +20,14 @@ class Filter attr_accessor :filters - def initialize p, fid + def initialize p @params = p - @filter_id = fid load_filters_param end def params @params end - - def filter_id - @filter_id - end end # Manages websocket connections, accepts subscription messages and publishes @@ -51,12 +46,6 @@ class EventBus @channel = EventMachine::Channel.new @mtx = Mutex.new @bgthread = false - @filter_id_counter = 0 - end - - # Allocate a new filter id - def alloc_filter_id - @filter_id_counter += 1 end # Push out any pending events to the connection +ws+ @@ -69,7 +58,8 @@ class EventBus logs = Log.readable_by(ws.user).order("id asc") if ws.last_log_id - # Only interested in log rows that are new + # Client is only interested in log rows that are newer than the + # last log row seen by the client. logs = logs.where("logs.id > ?", ws.last_log_id) elsif id # No last log id, so only look at the most recently changed row @@ -92,7 +82,7 @@ class EventBus logs = logs.where(cond_out.join(' OR '), *param_out) end - # Finally execute query and send matching rows + # Finally execute query and actually send the matching log rows logs.each do |l| ws.send(l.as_api_response.to_json) ws.last_log_id = l.id @@ -137,33 +127,41 @@ class EventBus # Set up callback for inbound message dispatch. ws.on :message do |event| begin + # Parse event data as JSON p = (Oj.load event.data).symbolize_keys + if p[:method] == 'subscribe' + # Handle subscribe event + if p[:last_log_id] + # Set or reset the last_log_id. The event bus only reports events + # for rows that come after last_log_id. ws.last_log_id = p[:last_log_id].to_i end if ws.filters.length < MAX_FILTERS - filter_id = alloc_filter_id - ws.filters.push Filter.new(p, filter_id) - ws.send ({status: 200, message: 'subscribe ok', filter_id: filter_id}.to_json) + # Add a filter. This gets the :filters field which is the same + # format as used for regular index queries. + ws.filters << Filter.new(p) + ws.send ({status: 200, message: 'subscribe ok'}.to_json) + + # Send any pending events push_events ws else ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json) end + elsif p[:method] == 'unsubscribe' - if filter_id = p[:filter_id] - filter_id = filter_id.to_i - len = ws.filters.length - ws.filters = ws.filters.select { |f| f.filter_id != filter_id } - if ws.filters.length < len - ws.send ({status: 200, message: 'unsubscribe ok', filter_id: filter_id}.to_json) - else - ws.send ({status: 404, message: 'filter_id not found', filter_id: filter_id}.to_json) - end + # Handle unsubscribe event + + len = ws.filters.length + ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) } + if ws.filters.length < len + ws.send ({status: 200, message: 'unsubscribe ok'}.to_json) else - ws.send ({status: 400, message: 'must provide filter_id'}.to_json) + ws.send ({status: 404, message: 'filter not found'}.to_json) end + else ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) end @@ -177,11 +175,13 @@ class EventBus end end + # Set up socket close callback ws.on :close do |event| @channel.unsubscribe sub ws = nil end + # Start up thread to monitor the Postgres database, if none exists already. @mtx.synchronize do unless @bgthread @bgthread = true @@ -192,6 +192,12 @@ class EventBus begin conn.async_exec "LISTEN logs" while true + # wait_for_notify will block until there is a change + # notification from Postgres about the logs table, then push + # the notification into the EventMachine channel. Each + # websocket connection subscribes to the other end of the + # channel and calls #push_events to actually dispatch the + # events to the client. conn.wait_for_notify do |channel, pid, payload| @channel.push payload end @@ -206,5 +212,9 @@ class EventBus end end end + + # Since EventMachine is an asynchronous event based dispatcher, #on_connect + # does not block but instead returns immediately after having set up the + # websocket and notification channel callbacks. end end diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb index 3bba0ef466..4761800414 100644 --- a/services/api/test/integration/websocket_test.rb +++ b/services/api/test/integration/websocket_test.rb @@ -211,6 +211,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest when 2 assert_equal 200, d["status"] spec = Specimen.create + Trait.create # not part of filters, should not be received human = Human.create state = 3 when 3 @@ -281,6 +282,9 @@ class WebsocketTest < ActionDispatch::IntegrationTest ws.on :open do |event| ws.send ({method: 'subscribe'}.to_json) EM::Timer.new 3 do + # Set a time limit on the test because after unsubscribing the server + # still has to process the next event (and then hopefully correctly + # decides not to send it because we unsubscribed.) ws.close end end @@ -290,15 +294,14 @@ class WebsocketTest < ActionDispatch::IntegrationTest case state when 1 assert_equal 200, d["status"] - filter_id = d["filter_id"] spec = Specimen.create state = 2 when 2 spec_ev_uuid = d["object_uuid"] - ws.send ({method: 'unsubscribe', filter_id: filter_id}.to_json) + ws.send ({method: 'unsubscribe'}.to_json) EM::Timer.new 1 do - Human.create + Specimen.create end state = 3 @@ -316,19 +319,22 @@ class WebsocketTest < ActionDispatch::IntegrationTest assert_equal spec.uuid, spec_ev_uuid end - - test "connect, subscribe, get event, try to unsubscribe with bogus filter_id" do + test "connect, subscribe, get event, unsubscribe with filter" do state = 1 spec = nil spec_ev_uuid = nil - human = nil - human_ev_uuid = nil authorize_with :admin - ws_helper :admin do |ws| + ws_helper :admin, false do |ws| ws.on :open do |event| - ws.send ({method: 'subscribe'}.to_json) + ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json) + EM::Timer.new 3 do + # Set a time limit on the test because after unsubscribing the server + # still has to process the next event (and then hopefully correctly + # decides not to send it because we unsubscribed.) + ws.close + end end ws.on :message do |event| @@ -336,35 +342,33 @@ class WebsocketTest < ActionDispatch::IntegrationTest case state when 1 assert_equal 200, d["status"] - spec = Specimen.create + spec = Human.create state = 2 when 2 spec_ev_uuid = d["object_uuid"] - ws.send ({method: 'unsubscribe', filter_id: 100000}.to_json) + ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json) EM::Timer.new 1 do - human = Human.create + Human.create end state = 3 when 3 - assert_equal 404, d["status"] + assert_equal 200, d["status"] state = 4 when 4 - human_ev_uuid = d["object_uuid"] - ws.close + assert false, "Should not get any more events" end end end assert_not_nil spec - assert_not_nil human assert_equal spec.uuid, spec_ev_uuid - assert_equal human.uuid, human_ev_uuid end - test "connect, subscribe, get event, try to unsubscribe with missing filter_id" do + + test "connect, subscribe, get event, try to unsubscribe with bogus filter" do state = 1 spec = nil spec_ev_uuid = nil @@ -387,7 +391,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest state = 2 when 2 spec_ev_uuid = d["object_uuid"] - ws.send ({method: 'unsubscribe'}.to_json) + ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json) EM::Timer.new 1 do human = Human.create @@ -395,7 +399,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest state = 3 when 3 - assert_equal 400, d["status"] + assert_equal 404, d["status"] state = 4 when 4 human_ev_uuid = d["object_uuid"] @@ -412,6 +416,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest end + test "connected, not subscribed, no event" do authorize_with :admin @@ -532,10 +537,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest ws.on :message do |event| d = Oj.load event.data case state - when (1..16) + when (1..EventBus::MAX_FILTERS) assert_equal 200, d["status"] state += 1 - when 17 + when (EventBus::MAX_FILTERS+1) assert_equal 403, d["status"] ws.close end diff --git a/services/api/test/websocket_runner.rb b/services/api/test/websocket_runner.rb index c35938e882..df72e246a6 100644 --- a/services/api/test/websocket_runner.rb +++ b/services/api/test/websocket_runner.rb @@ -6,7 +6,7 @@ SERVER_PID_PATH = 'tmp/pids/passenger.3002.pid' class WebsocketTestRunner < MiniTest::Unit def _system(*cmd) Bundler.with_clean_env do - if not system({'ARVADOS_WEBSOCKETS' => '1', 'RAILS_ENV' => 'test'}, *cmd) + if not system({'ARVADOS_WEBSOCKETS' => 'ws-only', 'RAILS_ENV' => 'test'}, *cmd) raise RuntimeError, "#{cmd[0]} returned exit code #{$?.exitstatus}" end end -- 2.30.2