9388: Add ws.notify_queue to ensure that notifies occurring during "catch up"
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Jun 2016 19:48:08 +0000 (15:48 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Jun 2016 19:48:08 +0000 (15:48 -0400)
will get delivered in the correct order after catch up completes.

services/api/lib/eventbus.rb

index cce6798cb0e950dea9412ab6ea601c1897cbf91e..eb7750ff2105019bff502031d933a2e699f042b9 100644 (file)
@@ -16,6 +16,7 @@ module Faye
     attr_accessor :last_log_id
     attr_accessor :filters
     attr_accessor :sent_ids
+    attr_accessor :notify_queue
   end
 end
 
@@ -78,14 +79,18 @@ class EventBus
         cond_out = []
         param_out = []
 
-        if !ws.last_log_id.nil?
+        if not notify_id.nil?
+          ws.notify_queue.unshift notify_id
+        end
+
+        if not ws.last_log_id.nil?
           # We are catching up from some starting point.
           cond_id = "logs.id > ?"
           param_out << ws.last_log_id
-        elsif !notify_id.nil?
-          # Get new row being notified.
+        elsif ws.notify_queue.length > 0
+          # Get next row being notified.
           cond_id = "logs.id = ?"
-          param_out << notify_id
+          param_out << ws.notify_queue.pop
         else
           # No log id to start from, nothing to do, return
           return
@@ -121,7 +126,7 @@ class EventBus
             ws.send(l.as_api_response.to_json)
           end
           if not ws.last_log_id.nil?
-            # only record ids from "catchup" messages and not notifies
+            # record ids only when sending "catchup" messages, not notifies
             ws.sent_ids << l.id
           end
           lastid = l.id
@@ -134,12 +139,18 @@ class EventBus
           # reported by current query)
           ws.last_log_id = lastid
           EventMachine::next_tick do
-            push_events ws, notify_id
+            push_events ws, nil
           end
         elsif !ws.last_log_id.nil?
           # Done catching up
           ws.last_log_id = nil
         end
+
+        if ws.notify_queue.length > 0
+          EventMachine::next_tick do
+            push_events ws, nil
+          end
+        end
       end
     rescue ArgumentError => e
       # There was some kind of user error.
@@ -232,6 +243,7 @@ class EventBus
     ws.filters = []
     ws.last_log_id = nil
     ws.sent_ids = Set.new
+    ws.notify_queue = Array.new
 
     # Subscribe to internal postgres notifications through @channel.  This will
     # call push_events when a notification comes through.