9427: Add limits for connections, subscriptions, queued notifications, and
[arvados.git] / services / api / test / integration / websocket_test.rb
index 8f28fc80a2bc58989fa7dee86ad8ba4b38f92c35..25e7592c39577bdbf64be90a03d286b05f0120e4 100644 (file)
@@ -16,8 +16,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
     DatabaseCleaner.clean
   end
 
-  def ws_helper (token = nil)
+  def ws_helper (token = nil, timeout = true)
+    opened = false
     close_status = nil
+    too_long = false
 
     EM.run {
       if token
@@ -26,86 +28,80 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
       end
 
-      ws.on :close do |event|
-        close_status = [:close, event.code, event.reason]
-        EM.stop_event_loop
+      ws.on :open do |event|
+        opened = true
+        if timeout
+          EM::Timer.new 8 do
+            too_long = true if close_status.nil?
+            EM.stop_event_loop
+          end
+        end
       end
 
-      EM::Timer.new 3 do
+      ws.on :close do |event|
+        close_status = [:close, event.code, event.reason]
         EM.stop_event_loop
       end
 
       yield ws
     }
 
-    assert_not_nil close_status, "Test took too long"
-    assert_equal 1000, close_status[1], "Server closed the connection unexpectedly (check server log for errors)"
+    assert opened, "Should have opened web socket"
+    assert (not too_long), "Test took too long"
+    assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
   end
 
   test "connect with no token" do
-    opened = false
     status = nil
 
     ws_helper do |ws|
-      ws.on :open do |event|
-        opened = true
-      end
-
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         status = d["status"]
         ws.close
       end
     end
 
-    assert opened, "Should have opened web socket"
     assert_equal 401, status
   end
 
 
   test "connect, subscribe and get response" do
-    opened = false
     status = nil
 
     ws_helper :admin do |ws|
       ws.on :open do |event|
-        opened = true
         ws.send ({method: 'subscribe'}.to_json)
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         status = d["status"]
         ws.close
       end
     end
 
-    assert opened, "Should have opened web socket"
     assert_equal 200, status
   end
 
-  test "connect, subscribe, get event" do
-    opened = false
+  def subscribe_test
     state = 1
-    spec_uuid = nil
+    spec = nil
     ev_uuid = nil
 
     authorize_with :admin
 
     ws_helper :admin do |ws|
       ws.on :open do |event|
-        opened = true
         ws.send ({method: 'subscribe'}.to_json)
       end
 
       ws.on :message do |event|
-        d = Oj.load event.data
+        d = Oj.strict_load event.data
         case state
         when 1
           assert_equal 200, d["status"]
           spec = Specimen.create
-          spec.save
-          spec_uuid = spec.uuid
           state = 2
         when 2
           ev_uuid = d["object_uuid"]
@@ -115,9 +111,590 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
     end
 
-    assert opened, "Should have opened web socket"
-    assert_not_nil spec_uuid
-    assert_equal spec_uuid, ev_uuid
+    assert_not_nil spec
+    assert_equal spec.uuid, ev_uuid
+  end
+
+  test "connect, subscribe, get event" do
+    subscribe_test()
+  end
+
+  test "connect, subscribe, get two events" do
+    state = 1
+    spec = nil
+    human = nil
+    spec_ev_uuid = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          state = 3
+        when 3
+          human_ev_uuid = d["object_uuid"]
+          state = 4
+          ws.close
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+  test "connect, subscribe, filter events" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          human_ev_uuid = d["object_uuid"]
+          state = 3
+          ws.close
+        when 3
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil human
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+
+  test "connect, subscribe, multiple filters" do
+    state = 1
+    spec = nil
+    human = nil
+    spec_ev_uuid = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          state = 2
+        when 2
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          Trait.create # not part of filters, should not be received
+          human = Human.create
+          state = 3
+        when 3
+          spec_ev_uuid = d["object_uuid"]
+          state = 4
+        when 4
+          human_ev_uuid = d["object_uuid"]
+          state = 5
+          ws.close
+        when 5
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+
+  test "connect, subscribe, compound filter" do
+    state = 1
+    t1 = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#trait'], ['event_type', '=', 'update']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          t1 = Trait.create("name" => "foo")
+          t1.name = "bar"
+          t1.save!
+          state = 2
+         when 2
+          assert_equal 'update', d['event_type']
+          state = 3
+          ws.close
+        when 3
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_equal 3, state
+    assert_not_nil t1
+  end
+
+  test "connect, subscribe, ask events starting at seq num" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    lastid = logs(:admin_changes_specimen).id
+    l1 = nil
+    l2 = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          state = 2
+        when 2
+          l1 = d["object_uuid"]
+          assert_not_nil l1, "Unexpected message: #{d}"
+          state = 3
+        when 3
+          l2 = d["object_uuid"]
+          assert_not_nil l2, "Unexpected message: #{d}"
+          state = 4
+          ws.close
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+    end
+
+    expect_next_logs = Log.where('id > ?', lastid).order('id asc')
+    assert_equal expect_next_logs[0].object_uuid, l1
+    assert_equal expect_next_logs[1].object_uuid, l2
+  end
+
+  test "connect, subscribe, get event, unsubscribe" do
+    slow_test
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+    filter_id = nil
+
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+        EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe'}.to_json)
+
+          EM::Timer.new 1 do
+            Specimen.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 200, d["status"]
+          state = 4
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_equal spec.uuid, spec_ev_uuid
+  end
+
+  test "connect, subscribe, get event, unsubscribe with filter" do
+    slow_test
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+        EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Human.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+
+          EM::Timer.new 1 do
+            Human.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 200, d["status"]
+          state = 4
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_equal spec.uuid, spec_ev_uuid
+  end
+
+
+  test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
+    slow_test
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
+
+          EM::Timer.new 1 do
+            human = Human.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 404, d["status"]
+          state = 4
+        when 4
+          human_ev_uuid = d["object_uuid"]
+          state = 5
+          ws.close
+        when 5
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+
+
+  test "connected, not subscribed, no event" do
+    slow_test
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        EM::Timer.new 1 do
+          Specimen.create
+        end
+
+        EM::Timer.new 3 do
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        assert false, "Should not get any messages, message was #{event.data}"
+      end
+    end
   end
 
+  test "connected, not authorized to see event" do
+    slow_test
+    state = 1
+
+    authorize_with :admin
+
+    ws_helper :active, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+
+        EM::Timer.new 3 do
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          state = 2
+        when 2
+          assert false, "Should not get any messages, message was #{event.data}"
+        end
+      end
+
+    end
+
+  end
+
+  test "connect, try bogus method" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'frobnabble'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+  test "connect, missing method" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({fizzbuzz: 'frobnabble'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+  test "connect, send malformed request" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send '<XML4EVER></XML4EVER>'
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+
+  test "connect, try subscribe too many filters" do
+    state = 1
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        (1..17).each do |i|
+          ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when (1..Rails.configuration.websocket_max_filters)
+          assert_equal 200, d["status"]
+          state += 1
+        when (Rails.configuration.websocket_max_filters+1)
+          assert_equal 403, d["status"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_equal Rails.configuration.websocket_max_filters+1, state
+
+  end
+
+  test "connect, subscribe, lots of events" do
+    slow_test
+    state = 1
+    event_count = 0
+    log_start = Log.order(:id).last.id
+
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      EM::Timer.new 45 do
+        # Needs a longer timeout than the default
+        ws.close
+      end
+
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          ActiveRecord::Base.transaction do
+            (1..202).each do
+              spec = Specimen.create
+            end
+          end
+          state = 2
+        when 2
+          event_count += 1
+          assert_equal d['id'], event_count+log_start
+          if event_count == 202
+            ws.close
+          end
+        end
+      end
+
+    end
+
+    assert_equal 202, event_count
+  end
+
+
+  test "connect, subscribe with invalid filter" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        # test that #6451 is fixed (invalid filter crashes websockets)
+        ws.send ({method: 'subscribe', filters: [['object_blarg', 'is_a', 'arvados#human']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.strict_load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          assert_equal 500, d["status"]
+          state = 3
+          ws.close
+        when 3
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_equal 3, state
+
+    # Try connecting again, ensure that websockets server is still running and
+    # didn't crash per #6451
+    subscribe_test()
+
+  end
+
+
 end