Merge branch '3692-event-bus-fix-and' closes #3692
[arvados.git] / services / api / lib / eventbus.rb
1 require 'eventmachine'
2 require 'oj'
3 require 'faye/websocket'
4 require 'record_filters'
5 require 'load_param'
6
7 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
8 module Faye
9   class WebSocket
10     attr_accessor :user
11     attr_accessor :last_log_id
12     attr_accessor :filters
13   end
14 end
15
16 # Store the filters supplied by the user that will be applied to the logs table
17 # to determine which events to return to the listener.
18 class Filter
19   include LoadParam
20
21   attr_accessor :filters
22
23   def initialize p
24     @params = p
25     load_filters_param
26   end
27
28   def params
29     @params
30   end
31 end
32
33 # Manages websocket connections, accepts subscription messages and publishes
34 # log table events.
35 class EventBus
36   include CurrentApiClient
37   include RecordFilters
38
39   # used in RecordFilters
40   def model_class
41     Log
42   end
43
44   # Initialize EventBus.  Takes no parameters.
45   def initialize
46     @channel = EventMachine::Channel.new
47     @mtx = Mutex.new
48     @bgthread = false
49   end
50
51   # Push out any pending events to the connection +ws+
52   # +id+  the id of the most recent row in the log table, may be nil
53   def push_events ws, id = nil
54       begin
55         # Must have at least one filter set up to receive events
56         if ws.filters.length > 0
57           # Start with log rows readable by user, sorted in ascending order
58           logs = Log.readable_by(ws.user).order("id asc")
59
60           cond_id = nil
61           cond_out = []
62           param_out = []
63
64           if ws.last_log_id
65             # Client is only interested in log rows that are newer than the
66             # last log row seen by the client.
67             cond_id = "logs.id > ?"
68             param_out << ws.last_log_id
69           elsif id
70             # No last log id, so only look at the most recently changed row
71             cond_id = "logs.id = ?"
72             param_out << id.to_i
73           else
74             return
75           end
76
77           # Now process filters provided by client
78           ws.filters.each do |filter|
79             ft = record_filters filter.filters, Log
80             if ft[:cond_out].any?
81               # Join the clauses within a single subscription filter with AND
82               # so it is consistent with regular queries
83               cond_out << "(#{ft[:cond_out].join ') AND ('})"
84               param_out += ft[:param_out]
85             end
86           end
87
88           # Add filters to query
89           if cond_out.any?
90             # Join subscriptions with OR
91             logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
92           else
93             logs = logs.where(cond_id, *param_out)
94           end
95
96           # Finally execute query and actually send the matching log rows
97           logs.each do |l|
98             ws.send(l.as_api_response.to_json)
99             ws.last_log_id = l.id
100           end
101         elsif id
102           # No filters set up, so just record the sequence number
103           ws.last_log_id = id.to_i
104         end
105       rescue Exception => e
106         Rails.logger.warn "Error publishing event: #{$!}"
107         Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
108         ws.send ({status: 500, message: 'error'}.to_json)
109         ws.close
110       end
111   end
112
113   # Handle inbound subscribe or unsubscribe message.
114   def handle_message ws, event
115     begin
116       # Parse event data as JSON
117       p = (Oj.load event.data).symbolize_keys
118
119       if p[:method] == 'subscribe'
120         # Handle subscribe event
121
122         if p[:last_log_id]
123           # Set or reset the last_log_id.  The event bus only reports events
124           # for rows that come after last_log_id.
125           ws.last_log_id = p[:last_log_id].to_i
126         end
127
128         if ws.filters.length < MAX_FILTERS
129           # Add a filter.  This gets the :filters field which is the same
130           # format as used for regular index queries.
131           ws.filters << Filter.new(p)
132           ws.send ({status: 200, message: 'subscribe ok'}.to_json)
133
134           # Send any pending events
135           push_events ws
136         else
137           ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
138         end
139
140       elsif p[:method] == 'unsubscribe'
141         # Handle unsubscribe event
142
143         len = ws.filters.length
144         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
145         if ws.filters.length < len
146           ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
147         else
148           ws.send ({status: 404, message: 'filter not found'}.to_json)
149         end
150
151       else
152         ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
153       end
154     rescue Oj::Error => e
155       ws.send ({status: 400, message: "malformed request"}.to_json)
156     rescue Exception => e
157       Rails.logger.warn "Error handling message: #{$!}"
158       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
159       ws.send ({status: 500, message: 'error'}.to_json)
160       ws.close
161     end
162   end
163
164   # Constant maximum number of filters, to avoid silly huge database queries.
165   MAX_FILTERS = 16
166
167   # Called by RackSocket when a new websocket connection has been established.
168   def on_connect ws
169
170     # Disconnect if no valid API token.
171     # current_user is included from CurrentApiClient
172     if not current_user
173       ws.send ({status: 401, message: "Valid API token required"}.to_json)
174       ws.close
175       return
176     end
177
178     # Initialize our custom fields on the websocket connection object.
179     ws.user = current_user
180     ws.filters = []
181     ws.last_log_id = nil
182
183     # Subscribe to internal postgres notifications through @channel.  This will
184     # call push_events when a notification comes through.
185     sub = @channel.subscribe do |msg|
186       push_events ws, msg
187     end
188
189     # Set up callback for inbound message dispatch.
190     ws.on :message do |event|
191       handle_message ws, event
192     end
193
194     # Set up socket close callback
195     ws.on :close do |event|
196       @channel.unsubscribe sub
197       ws = nil
198     end
199
200     # Start up thread to monitor the Postgres database, if none exists already.
201     @mtx.synchronize do
202       unless @bgthread
203         @bgthread = true
204         Thread.new do
205           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
206           ActiveRecord::Base.connection_pool.with_connection do |connection|
207             conn = connection.instance_variable_get(:@connection)
208             begin
209               conn.async_exec "LISTEN logs"
210               while true
211                 # wait_for_notify will block until there is a change
212                 # notification from Postgres about the logs table, then push
213                 # the notification into the EventMachine channel.  Each
214                 # websocket connection subscribes to the other end of the
215                 # channel and calls #push_events to actually dispatch the
216                 # events to the client.
217                 conn.wait_for_notify do |channel, pid, payload|
218                   @channel.push payload
219                 end
220               end
221             ensure
222               # Don't want the connection to still be listening once we return
223               # it to the pool - could result in weird behavior for the next
224               # thread to check it out.
225               conn.async_exec "UNLISTEN *"
226             end
227           end
228           @bgthread = false
229         end
230       end
231     end
232
233     # Since EventMachine is an asynchronous event based dispatcher, #on_connect
234     # does not block but instead returns immediately after having set up the
235     # websocket and notification channel callbacks.
236   end
237 end