9427: Add limits for connections, subscriptions, queued notifications, and
[arvados.git] / services / api / test / integration / websocket_test.rb
index 3bba0ef4666b6722662e604ca9e92a666e8dc09d..25e7592c39577bdbf64be90a03d286b05f0120e4 100644 (file)
@@ -3,7 +3,7 @@ require 'websocket_runner'
 require 'oj'
 require 'database_cleaner'
 
-DatabaseCleaner.strategy = :truncation
+DatabaseCleaner.strategy = :deletion
 
 class WebsocketTest < ActionDispatch::IntegrationTest
   self.use_transactional_fixtures = false
@@ -31,8 +31,8 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       ws.on :open do |event|
         opened = true
         if timeout
-          EM::Timer.new 3 do
-            too_long = true
+          EM::Timer.new 8 do
+            too_long = true if close_status.nil?
             EM.stop_event_loop
           end
         end
@@ -56,7 +56,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
     ws_helper do |ws|
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         status = d["status"]
         ws.close
       end
@@ -75,7 +75,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         status = d["status"]
         ws.close
       end
@@ -84,7 +84,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
     assert_equal 200, status
   end
 
-  test "connect, subscribe, get event" do
+  def subscribe_test
     state = 1
     spec = nil
     ev_uuid = nil
@@ -97,7 +97,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
@@ -115,6 +115,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
     assert_equal spec.uuid, ev_uuid
   end
 
+  test "connect, subscribe, get event" do
+    subscribe_test()
+  end
+
   test "connect, subscribe, get two events" do
     state = 1
     spec = nil
@@ -130,7 +134,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
@@ -142,7 +146,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 3
         when 3
           human_ev_uuid = d["object_uuid"]
+          state = 4
           ws.close
+        when 4
+          assert false, "Should not get any more events"
         end
       end
 
@@ -167,7 +174,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
@@ -176,7 +183,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 2
         when 2
           human_ev_uuid = d["object_uuid"]
+          state = 3
           ws.close
+        when 3
+          assert false, "Should not get any more events"
         end
       end
 
@@ -203,7 +213,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
@@ -211,6 +221,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         when 2
           assert_equal 200, d["status"]
           spec = Specimen.create
+          Trait.create # not part of filters, should not be received
           human = Human.create
           state = 3
         when 3
@@ -218,7 +229,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 4
         when 4
           human_ev_uuid = d["object_uuid"]
+          state = 5
           ws.close
+        when 5
+          assert false, "Should not get any more events"
         end
       end
 
@@ -230,6 +244,42 @@ class WebsocketTest < ActionDispatch::IntegrationTest
     assert_equal human.uuid, human_ev_uuid
   end
 
+
+  test "connect, subscribe, compound filter" do
+    state = 1
+    t1 = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#trait'], ['event_type', '=', 'update']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          t1 = Trait.create("name" => "foo")
+          t1.name = "bar"
+          t1.save!
+          state = 2
+         when 2
+          assert_equal 'update', d['event_type']
+          state = 3
+          ws.close
+        when 3
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_equal 3, state
+    assert_not_nil t1
+  end
+
   test "connect, subscribe, ask events starting at seq num" do
     state = 1
     human = nil
@@ -237,7 +287,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
     authorize_with :admin
 
-    lastid = logs(:log3).id
+    lastid = logs(:admin_changes_specimen).id
     l1 = nil
     l2 = nil
 
@@ -247,7 +297,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
@@ -259,17 +309,21 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         when 3
           l2 = d["object_uuid"]
           assert_not_nil l2, "Unexpected message: #{d}"
+          state = 4
           ws.close
+        when 4
+          assert false, "Should not get any more events"
         end
       end
-
     end
 
-    assert_equal logs(:log4).object_uuid, l1
-    assert_equal logs(:log5).object_uuid, l2
+    expect_next_logs = Log.where('id > ?', lastid).order('id asc')
+    assert_equal expect_next_logs[0].object_uuid, l1
+    assert_equal expect_next_logs[1].object_uuid, l2
   end
 
   test "connect, subscribe, get event, unsubscribe" do
+    slow_test
     state = 1
     spec = nil
     spec_ev_uuid = nil
@@ -281,24 +335,26 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       ws.on :open do |event|
         ws.send ({method: 'subscribe'}.to_json)
         EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
           ws.close
         end
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
-          filter_id = d["filter_id"]
           spec = Specimen.create
           state = 2
         when 2
           spec_ev_uuid = d["object_uuid"]
-          ws.send ({method: 'unsubscribe', filter_id: filter_id}.to_json)
+          ws.send ({method: 'unsubscribe'}.to_json)
 
           EM::Timer.new 1 do
-            Human.create
+            Specimen.create
           end
 
           state = 3
@@ -316,55 +372,58 @@ class WebsocketTest < ActionDispatch::IntegrationTest
     assert_equal spec.uuid, spec_ev_uuid
   end
 
-
-  test "connect, subscribe, get event, try to unsubscribe with bogus filter_id" do
+  test "connect, subscribe, get event, unsubscribe with filter" do
+    slow_test
     state = 1
     spec = nil
     spec_ev_uuid = nil
-    human = nil
-    human_ev_uuid = nil
 
     authorize_with :admin
 
-    ws_helper :admin do |ws|
+    ws_helper :admin, false do |ws|
       ws.on :open do |event|
-        ws.send ({method: 'subscribe'}.to_json)
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+        EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
+          ws.close
+        end
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
-          spec = Specimen.create
+          spec = Human.create
           state = 2
         when 2
           spec_ev_uuid = d["object_uuid"]
-          ws.send ({method: 'unsubscribe', filter_id: 100000}.to_json)
+          ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
 
           EM::Timer.new 1 do
-            human = Human.create
+            Human.create
           end
 
           state = 3
         when 3
-          assert_equal 404, d["status"]
+          assert_equal 200, d["status"]
           state = 4
         when 4
-          human_ev_uuid = d["object_uuid"]
-          ws.close
+          assert false, "Should not get any more events"
         end
       end
 
     end
 
     assert_not_nil spec
-    assert_not_nil human
     assert_equal spec.uuid, spec_ev_uuid
-    assert_equal human.uuid, human_ev_uuid
   end
 
-  test "connect, subscribe, get event, try to unsubscribe with missing filter_id" do
+
+  test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
+    slow_test
     state = 1
     spec = nil
     spec_ev_uuid = nil
@@ -379,7 +438,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
@@ -387,7 +446,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 2
         when 2
           spec_ev_uuid = d["object_uuid"]
-          ws.send ({method: 'unsubscribe'}.to_json)
+          ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
 
           EM::Timer.new 1 do
             human = Human.create
@@ -395,11 +454,14 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
           state = 3
         when 3
-          assert_equal 400, d["status"]
+          assert_equal 404, d["status"]
           state = 4
         when 4
           human_ev_uuid = d["object_uuid"]
+          state = 5
           ws.close
+        when 5
+          assert false, "Should not get any more events"
         end
       end
 
@@ -412,7 +474,9 @@ class WebsocketTest < ActionDispatch::IntegrationTest
   end
 
 
+
   test "connected, not subscribed, no event" do
+    slow_test
     authorize_with :admin
 
     ws_helper :admin, false do |ws|
@@ -433,6 +497,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
   end
 
   test "connected, not authorized to see event" do
+    slow_test
     state = 1
 
     authorize_with :admin
@@ -447,7 +512,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
@@ -471,7 +536,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         status = d["status"]
         ws.close
       end
@@ -489,7 +554,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         status = d["status"]
         ws.close
       end
@@ -507,7 +572,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         status = d["status"]
         ws.close
       end
@@ -530,12 +595,12 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
-        when (1..16)
+        when (1..Rails.configuration.websocket_max_filters)
           assert_equal 200, d["status"]
           state += 1
-        when 17
+        when (Rails.configuration.websocket_max_filters+1)
           assert_equal 403, d["status"]
           ws.close
         end
@@ -543,8 +608,93 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
     end
 
-    assert_equal 17, state
+    assert_equal Rails.configuration.websocket_max_filters+1, state
+
+  end
+
+  test "connect, subscribe, lots of events" do
+    slow_test
+    state = 1
+    event_count = 0
+    log_start = Log.order(:id).last.id
+
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      EM::Timer.new 45 do
+        # Needs a longer timeout than the default
+        ws.close
+      end
+
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          ActiveRecord::Base.transaction do
+            (1..202).each do
+              spec = Specimen.create
+            end
+          end
+          state = 2
+        when 2
+          event_count += 1
+          assert_equal d['id'], event_count+log_start
+          if event_count == 202
+            ws.close
+          end
+        end
+      end
+
+    end
 
+    assert_equal 202, event_count
   end
 
+
+  test "connect, subscribe with invalid filter" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        # test that #6451 is fixed (invalid filter crashes websockets)
+        ws.send ({method: 'subscribe', filters: [['object_blarg', 'is_a', 'arvados#human']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          assert_equal 500, d["status"]
+          state = 3
+          ws.close
+        when 3
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_equal 3, state
+
+    # Try connecting again, ensure that websockets server is still running and
+    # didn't crash per #6451
+    subscribe_test()
+
+  end
+
+
 end