Adding more code documentation.
[arvados.git] / services / api / lib / eventbus.rb
index b36378b2da68ef03de45cc47d6c1cdacdfc3facc..8480cf47aa5eb6743f97df9365ebd72610120d49 100644 (file)
@@ -4,6 +4,7 @@ require 'faye/websocket'
 require 'record_filters'
 require 'load_param'
 
+# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
 module Faye
   class WebSocket
     attr_accessor :user
@@ -12,6 +13,8 @@ module Faye
   end
 end
 
+# Store the filters supplied by the user that will be applied to the logs table
+# to determine which events to return to the listener.
 class Filter
   include LoadParam
 
@@ -32,6 +35,8 @@ class Filter
   end
 end
 
+# Manages websocket connections, accepts subscription messages and publishes
+# log table events.
 class EventBus
   include CurrentApiClient
   include RecordFilters
@@ -41,6 +46,7 @@ class EventBus
     Log
   end
 
+  # Initialize EventBus.  Takes no parameters.
   def initialize
     @channel = EventMachine::Channel.new
     @mtx = Mutex.new
@@ -48,11 +54,14 @@ class EventBus
     @filter_id_counter = 0
   end
 
+  # Allocate a new filter id
   def alloc_filter_id
-    (@filter_id_counter += 1)
+    @filter_id_counter += 1
   end
 
-  def push_events ws, msg = nil
+  # Push out any pending events to the connection +ws+
+  # +id+  the id of the most recent row in the log table, may be nil
+  def push_events ws, id = nil
       begin
         # Must have at least one filter set up to receive events
         if ws.filters.length > 0
@@ -62,9 +71,9 @@ class EventBus
           if ws.last_log_id
             # Only interested in log rows that are new
             logs = logs.where("logs.id > ?", ws.last_log_id)
-          elsif msg
+          elsif id
             # No last log id, so only look at the most recently changed row
-            logs = logs.where("logs.id = ?", msg.to_i)
+            logs = logs.where("logs.id = ?", id.to_i)
           else
             return
           end
@@ -88,9 +97,9 @@ class EventBus
             ws.send(l.as_api_response.to_json)
             ws.last_log_id = l.id
           end
-        elsif msg
+        elsif id
           # No filters set up, so just record the sequence number
-          ws.last_log_id = msg.to_i
+          ws.last_log_id = id.to_i
         end
       rescue Exception => e
         puts "Error publishing event: #{$!}"
@@ -100,23 +109,32 @@ class EventBus
       end
   end
 
+  # Constant maximum number of filters, to avoid silly huge database queries.
   MAX_FILTERS = 16
 
+  # Called by RackSocket when a new websocket connection has been established.
   def on_connect ws
+
+    # Disconnect if no valid API token.
+    # current_user is included from CurrentApiClient
     if not current_user
       ws.send ({status: 401, message: "Valid API token required"}.to_json)
       ws.close
       return
     end
 
+    # Initialize our custom fields on the websocket connection object.
     ws.user = current_user
     ws.filters = []
     ws.last_log_id = nil
 
+    # Subscribe to internal postgres notifications through @channel.  This will
+    # call push_events when a notification comes through.
     sub = @channel.subscribe do |msg|
       push_events ws, msg
     end
 
+    # Set up callback for inbound message dispatch.
     ws.on :message do |event|
       begin
         p = (Oj.load event.data).symbolize_keys