10078: Update recent_collections so that the collection query it is not executed...
[arvados.git] / services / api / lib / eventbus.rb
index c72ad9089fa656914050880343a00c026c2304a0..16bb030941c3033ebf32cb972a645eb821a063d3 100644 (file)
@@ -25,17 +25,17 @@ end
 module WebSocket
   class Driver
 
-    class Hybi < Driver
-      alias_method :_frame, :frame
+    class Server
+      alias_method :_write, :write
 
-      def frame(data, type = nil, code = nil)
+      def write(data)
         # Most of the sending activity will be from the thread set up in
         # on_connect.  However, there is also some automatic activity in the
-        # form of ping/pong messages, so ensure that the frame method (which
-        # actually packs and sends one complete message to the underlying
-        # socket) can only be called by one thread at a time.
-        @socket.frame_mtx.synchronize do
-          _frame(data, type, code)
+        # form of ping/pong messages, so ensure that the write method used to
+        # send one complete message to the underlying socket can only be
+        # called by one thread at a time.
+        self.frame_mtx.synchronize do
+          _write(data)
         end
       end
     end
@@ -75,6 +75,7 @@ class EventBus
     @channel = EventMachine::Channel.new
     @mtx = Mutex.new
     @bgthread = false
+    @connection_count = 0
   end
 
   # Push out any pending events to the connection +ws+
@@ -92,8 +93,8 @@ class EventBus
     begin
       # Must have at least one filter set up to receive events
       if ws.filters.length > 0
-        # Start with log rows readable by user, sorted in ascending order
-        logs = Log.readable_by(ws.user).order("id asc")
+        # Start with log rows readable by user
+        logs = Log.readable_by(ws.user)
 
         cond_id = nil
         cond_out = []
@@ -131,11 +132,21 @@ class EventBus
           logs = logs.where(cond_id, *param_out)
         end
 
-        # Execute query and actually send the matching log rows
-        logs.each do |l|
+        # Execute query and actually send the matching log rows. Load
+        # the full log records only when we're ready to send them,
+        # though: otherwise, (1) postgres has to build the whole
+        # result set and return it to us before we can send the first
+        # event, and (2) we store lots of records in memory while
+        # waiting to spool them out to the client. Both of these are
+        # troublesome when log records are large (e.g., a collection
+        # update contains both old and new manifest_text).
+        #
+        # Note: find_each implies order('id asc'), which is what we
+        # want.
+        logs.select('logs.id').find_each do |l|
           if not ws.sent_ids.include?(l.id)
             # only send if not a duplicate
-            ws.send(l.as_api_response.to_json)
+            ws.send(Log.find(l.id).as_api_response.to_json)
           end
           if not ws.last_log_id.nil?
             # record ids only when sending "catchup" messages, not notifies
@@ -185,7 +196,7 @@ class EventBus
           ws.sent_ids = Set.new
         end
 
-        if ws.filters.length < MAX_FILTERS
+        if ws.filters.length < Rails.configuration.websocket_max_filters
           # Add a filter.  This gets the :filters field which is the same
           # format as used for regular index queries.
           ws.filters << filter
@@ -194,7 +205,7 @@ class EventBus
           # Send any pending events
           push_events ws, nil
         else
-          ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+          ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
         end
 
       elsif p[:method] == 'unsubscribe'
@@ -219,12 +230,14 @@ class EventBus
     end
   end
 
-  # Constant maximum number of filters, to avoid silly huge database queries.
-  MAX_FILTERS = 16
+  def overloaded?
+    @mtx.synchronize do
+      @connection_count >= Rails.configuration.websocket_max_connections
+    end
+  end
 
   # Called by RackSocket when a new websocket connection has been established.
   def on_connect ws
-
     # Disconnect if no valid API token.
     # current_user is included from CurrentApiClient
     if not current_user
@@ -241,13 +254,20 @@ class EventBus
     ws.queue = Queue.new
     ws.frame_mtx = Mutex.new
 
+    @mtx.synchronize do
+      @connection_count += 1
+    end
+
     # Subscribe to internal postgres notifications through @channel and
     # forward them to the thread associated with the connection.
     sub = @channel.subscribe do |msg|
-      if msg != :term
-        ws.queue << [:notify, msg]
-      else
+      if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
+        ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
         ws.close
+        @channel.unsubscribe sub
+        ws.queue.clear
+      else
+        ws.queue << [:notify, msg]
       end
     end
 
@@ -272,12 +292,9 @@ class EventBus
     # connections may not complete in a timely manner.
     Thread.new do
       # Loop and react to socket events.
-      loop do
-        eventType, msg = ws.queue.pop
-        if ws.queue.length > 1000
-          ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
-          ws.close
-        else
+      begin
+        loop do
+          eventType, msg = ws.queue.pop
           if eventType == :message
             handle_message ws, msg
           elsif eventType == :notify
@@ -286,6 +303,10 @@ class EventBus
             break
           end
         end
+      ensure
+        @mtx.synchronize do
+          @connection_count -= 1
+        end
       end
     end