Merge branch '9388-websocket-every-notify' closes #9388
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Jun 2016 20:24:55 +0000 (16:24 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Jun 2016 20:24:55 +0000 (16:24 -0400)
sdk/python/arvados/commands/ws.py
services/api/lib/eventbus.rb

index 347075dffdbb8cca1144f277b46086929ba86ab7..988ec961da889b59463990bf5049cd87a9f31345 100644 (file)
@@ -15,6 +15,7 @@ def main(arguments=None):
     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
     parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+    parser.add_argument('-i', '--id', type=int, default="", help="Start from given log id.")
 
     group = parser.add_mutually_exclusive_group()
     group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
@@ -67,6 +68,9 @@ def main(arguments=None):
     else:
         last_log_id = None
 
+    if args.id:
+        last_log_id = args.id-1
+
     def on_message(ev):
         global filters
         global ws
index 9bf95f57356e4eef7389b585917e26d8ec1973c9..b35da1b0a0e6fb84066e44d1a3d8ca5bde750fa8 100644 (file)
@@ -7,6 +7,7 @@ require 'oj'
 require 'faye/websocket'
 require 'record_filters'
 require 'load_param'
+require 'set'
 
 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
 module Faye
@@ -14,6 +15,8 @@ module Faye
     attr_accessor :user
     attr_accessor :last_log_id
     attr_accessor :filters
+    attr_accessor :sent_ids
+    attr_accessor :notify_queue
   end
 end
 
@@ -67,11 +70,6 @@ class EventBus
   # push_events call if there are more log rows to send.
   def push_events ws, notify_id
     begin
-      if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id
-        # This notify is for a row we've handled already.
-        return
-      end
-
       # Must have at least one filter set up to receive events
       if ws.filters.length > 0
         # Start with log rows readable by user, sorted in ascending order
@@ -81,15 +79,18 @@ class EventBus
         cond_out = []
         param_out = []
 
-        if !ws.last_log_id.nil?
-          # Client is only interested in log rows that are newer than the
-          # last log row seen by the client.
+        if not notify_id.nil?
+          ws.notify_queue.unshift notify_id
+        end
+
+        if not ws.last_log_id.nil?
+          # We are catching up from some starting point.
           cond_id = "logs.id > ?"
           param_out << ws.last_log_id
-        elsif !notify_id.nil?
-          # No last log id, so look at rows starting with notify id
-          cond_id = "logs.id >= ?"
-          param_out << notify_id
+        elsif ws.notify_queue.length > 0
+          # Get next row being notified.
+          cond_id = "logs.id = ?"
+          param_out << ws.notify_queue.pop
         else
           # No log id to start from, nothing to do, return
           return
@@ -118,9 +119,17 @@ class EventBus
         count = 0
         limit = 10
 
+        lastid = nil
         logs.limit(limit).each do |l|
-          ws.send(l.as_api_response.to_json)
-          ws.last_log_id = l.id
+          if not ws.sent_ids.include?(l.id)
+            # only send if not a duplicate
+            ws.send(l.as_api_response.to_json)
+          end
+          if not ws.last_log_id.nil?
+            # record ids only when sending "catchup" messages, not notifies
+            ws.sent_ids << l.id
+          end
+          lastid = l.id
           count += 1
         end
 
@@ -128,17 +137,20 @@ class EventBus
           # Number of rows returned was capped by limit(), we need to schedule
           # another query to get more logs (will start from last_log_id
           # reported by current query)
+          ws.last_log_id = lastid
+          EventMachine::next_tick do
+            push_events ws, nil
+          end
+        elsif !ws.last_log_id.nil?
+          # Done catching up
+          ws.last_log_id = nil
+        end
+
+        if ws.notify_queue.length > 0
           EventMachine::next_tick do
             push_events ws, nil
           end
-        elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
-          # Number of rows returned was less than cap, but the notify id is
-          # higher than the last id visible to the client, so update last_log_id
-          ws.last_log_id = notify_id
         end
-      elsif !notify_id.nil?
-        # No filters set up, so just record the sequence number
-        ws.last_log_id = notify_id
       end
     rescue ArgumentError => e
       # There was some kind of user error.
@@ -176,6 +188,9 @@ class EventBus
           # Set or reset the last_log_id.  The event bus only reports events
           # for rows that come after last_log_id.
           ws.last_log_id = p[:last_log_id].to_i
+          # Reset sent_ids for consistency
+          # (always re-deliver all matching messages following last_log_id)
+          ws.sent_ids = Set.new
         end
 
         if ws.filters.length < MAX_FILTERS
@@ -230,6 +245,8 @@ class EventBus
     ws.user = current_user
     ws.filters = []
     ws.last_log_id = nil
+    ws.sent_ids = Set.new
+    ws.notify_queue = Array.new
 
     # Subscribe to internal postgres notifications through @channel.  This will
     # call push_events when a notification comes through.