Merge branch 'master' into 6859-fix-invalid-manifests
[arvados.git] / services / api / lib / eventbus.rb
index 848279f9fda9f3126460c00a8b23239875643272..ac53876122d6b2e74b0d9fed85a56308308465b4 100644 (file)
@@ -1,3 +1,7 @@
+# If any threads raise an unhandled exception, make them all die.
+# We trust a supervisor like runit to restart the server in this case.
+Thread.abort_on_exception = true
+
 require 'eventmachine'
 require 'oj'
 require 'faye/websocket'
@@ -51,7 +55,9 @@ class EventBus
   # Push out any pending events to the connection +ws+
   # +notify_id+  the id of the most recent row in the log table, may be nil
   #
-  # This accepts a websocket and a notify_id (this is the row id from Postgres LISTEN/NOTIFY, it may nil)
+  # This accepts a websocket and a notify_id (this is the row id from Postgres
+  # LISTEN/NOTIFY, it may be nil if called from somewhere else)
+  #
   # It queries the database for log rows that are either
   #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
   #  b) if ws.last_log_id is nil, then it queries rows starting with notify_id
@@ -110,7 +116,7 @@ class EventBus
 
         # Execute query and actually send the matching log rows
         count = 0
-        limit = 100
+        limit = 10
 
         logs.limit(limit).each do |l|
           ws.send(l.as_api_response.to_json)
@@ -122,7 +128,9 @@ class EventBus
           # Number of rows returned was capped by limit(), we need to schedule
           # another query to get more logs (will start from last_log_id
           # reported by current query)
-          @channel.push nil
+          EventMachine::next_tick do
+            push_events ws, nil
+          end
         elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
           # Number of rows returned was less than cap, but the notify id is
           # higher than the last id visible to the client, so update last_log_id
@@ -132,19 +140,34 @@ class EventBus
         # No filters set up, so just record the sequence number
         ws.last_log_id = notify_id
       end
+    rescue ArgumentError => e
+      # There was some kind of user error.
+      Rails.logger.warn "Error publishing event: #{$!}"
+      ws.send ({status: 500, message: $!}.to_json)
+      ws.close
     rescue => e
       Rails.logger.warn "Error publishing event: #{$!}"
       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-      ws.send ({status: 500, message: 'error'}.to_json)
+      ws.send ({status: 500, message: $!}.to_json)
       ws.close
+      # These exceptions typically indicate serious server trouble:
+      # out of memory issues, database connection problems, etc.  Go ahead and
+      # crash; we expect that a supervisor service like runit will restart us.
+      raise
     end
   end
 
   # Handle inbound subscribe or unsubscribe message.
   def handle_message ws, event
     begin
-      # Parse event data as JSON
-      p = (Oj.load event.data).symbolize_keys
+      begin
+        # Parse event data as JSON
+        p = (Oj.load event.data).symbolize_keys
+        filter = Filter.new(p)
+      rescue Oj::Error => e
+        ws.send ({status: 400, message: "malformed request"}.to_json)
+        return
+      end
 
       if p[:method] == 'subscribe'
         # Handle subscribe event
@@ -158,7 +181,7 @@ class EventBus
         if ws.filters.length < MAX_FILTERS
           # Add a filter.  This gets the :filters field which is the same
           # format as used for regular index queries.
-          ws.filters << Filter.new(p)
+          ws.filters << filter
           ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
 
           # Send any pending events
@@ -181,8 +204,6 @@ class EventBus
       else
         ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
       end
-    rescue Oj::Error => e
-      ws.send ({status: 400, message: "malformed request"}.to_json)
     rescue => e
       Rails.logger.warn "Error handling message: #{$!}"
       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
@@ -248,9 +269,6 @@ class EventBus
                   @channel.push payload.to_i
                 end
               end
-            rescue NoMemoryError
-              EventMachine::stop_event_loop
-              abort "Out of memory"
             ensure
               # Don't want the connection to still be listening once we return
               # it to the pool - could result in weird behavior for the next