9388: Process each notify individually instead attempting to batch them up.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 9 Jun 2016 21:55:37 +0000 (17:55 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 10 Jun 2016 14:04:39 +0000 (10:04 -0400)
Prior to this commit, websockets used to try to send log events out in batches,
by getting all logs with an id greater than last log that was sent.
Unfortunately, under concurrent database writes, logs from uncommited
transactions may not appear in the query even if logs with larger ids do
appear.  This results in the uncommitted log never being sent out because
subsequent batch sends would not consider logs prior to the last log id that
was sent (which, in this case, is higher than the log that was missed.)

This commit eliminates the batching behavior.  Because NOTIFY includes the log
id of a specific record that was committed, consider only the log record with
that id and process events in the order that the NOTIFY events arrive.  This
means events may be delivered out of numeric order (although they now more
closely reflect the "actual" order, e.g. the order that the events were
actually committed to the database).

"Catch ups" where the client has specified a last_log_id and needs to have past
logs replayed continue to be sent in batches.

sdk/python/arvados/commands/ws.py
services/api/lib/eventbus.rb

index 347075dffdbb8cca1144f277b46086929ba86ab7..988ec961da889b59463990bf5049cd87a9f31345 100644 (file)
@@ -15,6 +15,7 @@ def main(arguments=None):
     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
     parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+    parser.add_argument('-i', '--id', type=int, default="", help="Start from given log id.")
 
     group = parser.add_mutually_exclusive_group()
     group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
@@ -67,6 +68,9 @@ def main(arguments=None):
     else:
         last_log_id = None
 
+    if args.id:
+        last_log_id = args.id-1
+
     def on_message(ev):
         global filters
         global ws
index 9bf95f57356e4eef7389b585917e26d8ec1973c9..4988d59f2e5a9eac9e1570a0adff9f1639015528 100644 (file)
@@ -67,11 +67,6 @@ class EventBus
   # push_events call if there are more log rows to send.
   def push_events ws, notify_id
     begin
-      if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id
-        # This notify is for a row we've handled already.
-        return
-      end
-
       # Must have at least one filter set up to receive events
       if ws.filters.length > 0
         # Start with log rows readable by user, sorted in ascending order
@@ -82,13 +77,12 @@ class EventBus
         param_out = []
 
         if !ws.last_log_id.nil?
-          # Client is only interested in log rows that are newer than the
-          # last log row seen by the client.
+          # We are catching up from some starting point.
           cond_id = "logs.id > ?"
           param_out << ws.last_log_id
         elsif !notify_id.nil?
-          # No last log id, so look at rows starting with notify id
-          cond_id = "logs.id >= ?"
+          # Get new row being notified.
+          cond_id = "logs.id = ?"
           param_out << notify_id
         else
           # No log id to start from, nothing to do, return
@@ -118,9 +112,10 @@ class EventBus
         count = 0
         limit = 10
 
+        lastid = nil
         logs.limit(limit).each do |l|
           ws.send(l.as_api_response.to_json)
-          ws.last_log_id = l.id
+          lastid = l.id
           count += 1
         end
 
@@ -128,13 +123,13 @@ class EventBus
           # Number of rows returned was capped by limit(), we need to schedule
           # another query to get more logs (will start from last_log_id
           # reported by current query)
+          ws.last_log_id = lastid
           EventMachine::next_tick do
-            push_events ws, nil
+            push_events ws, notify_id
           end
-        elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
-          # Number of rows returned was less than cap, but the notify id is
-          # higher than the last id visible to the client, so update last_log_id
-          ws.last_log_id = notify_id
+        elsif !ws.last_log_id.nil?
+          # Done catching up
+          ws.last_log_id = nil
         end
       elsif !notify_id.nil?
         # No filters set up, so just record the sequence number