Merge branch 'master' into 6859-fix-invalid-manifests
[arvados.git] / services / api / lib / eventbus.rb
index 1754fc0ae9b2de1bd7cc70a8d5ed5f1172f481f8..ac53876122d6b2e74b0d9fed85a56308308465b4 100644 (file)
@@ -1,3 +1,7 @@
+# If any threads raise an unhandled exception, make them all die.
+# We trust a supervisor like runit to restart the server in this case.
+Thread.abort_on_exception = true
+
 require 'eventmachine'
 require 'oj'
 require 'faye/websocket'
@@ -112,7 +116,7 @@ class EventBus
 
         # Execute query and actually send the matching log rows
         count = 0
-        limit = 100
+        limit = 10
 
         logs.limit(limit).each do |l|
           ws.send(l.as_api_response.to_json)
@@ -124,7 +128,7 @@ class EventBus
           # Number of rows returned was capped by limit(), we need to schedule
           # another query to get more logs (will start from last_log_id
           # reported by current query)
-          EventMachine::schedule do
+          EventMachine::next_tick do
             push_events ws, nil
           end
         elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
@@ -136,19 +140,34 @@ class EventBus
         # No filters set up, so just record the sequence number
         ws.last_log_id = notify_id
       end
+    rescue ArgumentError => e
+      # There was some kind of user error.
+      Rails.logger.warn "Error publishing event: #{$!}"
+      ws.send ({status: 500, message: $!}.to_json)
+      ws.close
     rescue => e
       Rails.logger.warn "Error publishing event: #{$!}"
       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-      ws.send ({status: 500, message: 'error'}.to_json)
+      ws.send ({status: 500, message: $!}.to_json)
       ws.close
+      # These exceptions typically indicate serious server trouble:
+      # out of memory issues, database connection problems, etc.  Go ahead and
+      # crash; we expect that a supervisor service like runit will restart us.
+      raise
     end
   end
 
   # Handle inbound subscribe or unsubscribe message.
   def handle_message ws, event
     begin
-      # Parse event data as JSON
-      p = (Oj.load event.data).symbolize_keys
+      begin
+        # Parse event data as JSON
+        p = (Oj.load event.data).symbolize_keys
+        filter = Filter.new(p)
+      rescue Oj::Error => e
+        ws.send ({status: 400, message: "malformed request"}.to_json)
+        return
+      end
 
       if p[:method] == 'subscribe'
         # Handle subscribe event
@@ -162,7 +181,7 @@ class EventBus
         if ws.filters.length < MAX_FILTERS
           # Add a filter.  This gets the :filters field which is the same
           # format as used for regular index queries.
-          ws.filters << Filter.new(p)
+          ws.filters << filter
           ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
 
           # Send any pending events
@@ -185,8 +204,6 @@ class EventBus
       else
         ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
       end
-    rescue Oj::Error => e
-      ws.send ({status: 400, message: "malformed request"}.to_json)
     rescue => e
       Rails.logger.warn "Error handling message: #{$!}"
       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
@@ -252,9 +269,6 @@ class EventBus
                   @channel.push payload.to_i
                 end
               end
-            rescue NoMemoryError
-              EventMachine::stop_event_loop
-              abort "Out of memory"
             ensure
               # Don't want the connection to still be listening once we return
               # it to the pool - could result in weird behavior for the next