Merge branch 'master' into 3261-keep-docker-config
[arvados.git] / services / api / lib / eventbus.rb
index 79315aaf6f92e8211f19ee2298d68b570914d4ca..0b8cae2439c287b61b42d9d74274b03fed3ccfa3 100644 (file)
@@ -99,6 +99,57 @@ class EventBus
       end
   end
 
+  # Handle inbound subscribe or unsubscribe message.
+  def handle_message ws, event
+    begin
+      # Parse event data as JSON
+      p = (Oj.load event.data).symbolize_keys
+
+      if p[:method] == 'subscribe'
+        # Handle subscribe event
+
+        if p[:last_log_id]
+          # Set or reset the last_log_id.  The event bus only reports events
+          # for rows that come after last_log_id.
+          ws.last_log_id = p[:last_log_id].to_i
+        end
+
+        if ws.filters.length < MAX_FILTERS
+          # Add a filter.  This gets the :filters field which is the same
+          # format as used for regular index queries.
+          ws.filters << Filter.new(p)
+          ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+
+          # Send any pending events
+          push_events ws
+        else
+          ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+        end
+
+      elsif p[:method] == 'unsubscribe'
+        # Handle unsubscribe event
+
+        len = ws.filters.length
+        ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
+        if ws.filters.length < len
+          ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+        else
+          ws.send ({status: 404, message: 'filter not found'}.to_json)
+        end
+
+      else
+        ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+      end
+    rescue Oj::Error => e
+      ws.send ({status: 400, message: "malformed request"}.to_json)
+    rescue Exception => e
+      puts "Error handling message: #{$!}"
+      puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+      ws.send ({status: 500, message: 'error'}.to_json)
+      ws.close
+    end
+  end
+
   # Constant maximum number of filters, to avoid silly huge database queries.
   MAX_FILTERS = 16
 
@@ -126,53 +177,7 @@ class EventBus
 
     # Set up callback for inbound message dispatch.
     ws.on :message do |event|
-      begin
-        # Parse event data as JSON
-        p = (Oj.load event.data).symbolize_keys
-
-        if p[:method] == 'subscribe'
-          # Handle subscribe event
-
-          if p[:last_log_id]
-            # Set or reset the last_log_id.  The event bus only reports events
-            # for rows that come after last_log_id.
-            ws.last_log_id = p[:last_log_id].to_i
-          end
-
-          if ws.filters.length < MAX_FILTERS
-            # Add a filter.  This gets the :filters field which is the same
-            # format as used for regular index queries.
-            ws.filters << Filter.new(p)
-            ws.send ({status: 200, message: 'subscribe ok'}.to_json)
-
-            # Send any pending events
-            push_events ws
-          else
-            ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
-          end
-
-        elsif p[:method] == 'unsubscribe'
-          # Handle unsubscribe event
-
-          len = ws.filters.length
-          ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
-          if ws.filters.length < len
-            ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
-          else
-            ws.send ({status: 404, message: 'filter not found'}.to_json)
-          end
-
-        else
-          ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
-        end
-      rescue Oj::Error => e
-        ws.send ({status: 400, message: "malformed request"}.to_json)
-      rescue Exception => e
-        puts "Error handling message: #{$!}"
-        puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-        ws.send ({status: 500, message: 'error'}.to_json)
-        ws.close
-      end
+      handle_message ws, event
     end
 
     # Set up socket close callback
@@ -209,6 +214,7 @@ class EventBus
               conn.async_exec "UNLISTEN *"
             end
           end
+          @bgthread = false
         end
       end
     end