9388: Add ws.notify_queue to ensure that notifies occurring during "catch up"
[arvados.git] / services / api / lib / eventbus.rb
1 # If any threads raise an unhandled exception, make them all die.
2 # We trust a supervisor like runit to restart the server in this case.
3 Thread.abort_on_exception = true
4
5 require 'eventmachine'
6 require 'oj'
7 require 'faye/websocket'
8 require 'record_filters'
9 require 'load_param'
10 require 'set'
11
12 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
13 module Faye
14   class WebSocket
15     attr_accessor :user
16     attr_accessor :last_log_id
17     attr_accessor :filters
18     attr_accessor :sent_ids
19     attr_accessor :notify_queue
20   end
21 end
22
23 # Store the filters supplied by the user that will be applied to the logs table
24 # to determine which events to return to the listener.
25 class Filter
26   include LoadParam
27
28   attr_accessor :filters
29
30   def initialize p
31     @params = p
32     load_filters_param
33   end
34
35   def params
36     @params
37   end
38 end
39
40 # Manages websocket connections, accepts subscription messages and publishes
41 # log table events.
42 class EventBus
43   include CurrentApiClient
44   include RecordFilters
45
46   # used in RecordFilters
47   def model_class
48     Log
49   end
50
51   # Initialize EventBus.  Takes no parameters.
52   def initialize
53     @channel = EventMachine::Channel.new
54     @mtx = Mutex.new
55     @bgthread = false
56   end
57
58   # Push out any pending events to the connection +ws+
59   # +notify_id+  the id of the most recent row in the log table, may be nil
60   #
61   # This accepts a websocket and a notify_id (this is the row id from Postgres
62   # LISTEN/NOTIFY, it may be nil if called from somewhere else)
63   #
64   # It queries the database for log rows that are either
65   #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
66   #  b) if ws.last_log_id is nil, then it queries rows starting with notify_id
67   #
68   # Regular Arvados permissions are applied using readable_by() and filters using record_filters()
69   # To avoid clogging up the database, queries are limited to batches of 100.  It will schedule a new
70   # push_events call if there are more log rows to send.
71   def push_events ws, notify_id
72     begin
73       # Must have at least one filter set up to receive events
74       if ws.filters.length > 0
75         # Start with log rows readable by user, sorted in ascending order
76         logs = Log.readable_by(ws.user).order("id asc")
77
78         cond_id = nil
79         cond_out = []
80         param_out = []
81
82         if not notify_id.nil?
83           ws.notify_queue.unshift notify_id
84         end
85
86         if not ws.last_log_id.nil?
87           # We are catching up from some starting point.
88           cond_id = "logs.id > ?"
89           param_out << ws.last_log_id
90         elsif ws.notify_queue.length > 0
91           # Get next row being notified.
92           cond_id = "logs.id = ?"
93           param_out << ws.notify_queue.pop
94         else
95           # No log id to start from, nothing to do, return
96           return
97         end
98
99         # Now build filters provided by client
100         ws.filters.each do |filter|
101           ft = record_filters filter.filters, Log
102           if ft[:cond_out].any?
103             # Join the clauses within a single subscription filter with AND
104             # so it is consistent with regular queries
105             cond_out << "(#{ft[:cond_out].join ') AND ('})"
106             param_out += ft[:param_out]
107           end
108         end
109
110         # Add filters to query
111         if cond_out.any?
112           # Join subscriptions with OR
113           logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
114         else
115           logs = logs.where(cond_id, *param_out)
116         end
117
118         # Execute query and actually send the matching log rows
119         count = 0
120         limit = 10
121
122         lastid = nil
123         logs.limit(limit).each do |l|
124           if not ws.sent_ids.include?(l.id)
125             # only send if not a duplicate
126             ws.send(l.as_api_response.to_json)
127           end
128           if not ws.last_log_id.nil?
129             # record ids only when sending "catchup" messages, not notifies
130             ws.sent_ids << l.id
131           end
132           lastid = l.id
133           count += 1
134         end
135
136         if count == limit
137           # Number of rows returned was capped by limit(), we need to schedule
138           # another query to get more logs (will start from last_log_id
139           # reported by current query)
140           ws.last_log_id = lastid
141           EventMachine::next_tick do
142             push_events ws, nil
143           end
144         elsif !ws.last_log_id.nil?
145           # Done catching up
146           ws.last_log_id = nil
147         end
148
149         if ws.notify_queue.length > 0
150           EventMachine::next_tick do
151             push_events ws, nil
152           end
153         end
154       end
155     rescue ArgumentError => e
156       # There was some kind of user error.
157       Rails.logger.warn "Error publishing event: #{$!}"
158       ws.send ({status: 500, message: $!}.to_json)
159       ws.close
160     rescue => e
161       Rails.logger.warn "Error publishing event: #{$!}"
162       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
163       ws.send ({status: 500, message: $!}.to_json)
164       ws.close
165       # These exceptions typically indicate serious server trouble:
166       # out of memory issues, database connection problems, etc.  Go ahead and
167       # crash; we expect that a supervisor service like runit will restart us.
168       raise
169     end
170   end
171
172   # Handle inbound subscribe or unsubscribe message.
173   def handle_message ws, event
174     begin
175       begin
176         # Parse event data as JSON
177         p = (Oj.strict_load event.data).symbolize_keys
178         filter = Filter.new(p)
179       rescue Oj::Error => e
180         ws.send ({status: 400, message: "malformed request"}.to_json)
181         return
182       end
183
184       if p[:method] == 'subscribe'
185         # Handle subscribe event
186
187         if p[:last_log_id]
188           # Set or reset the last_log_id.  The event bus only reports events
189           # for rows that come after last_log_id.
190           ws.last_log_id = p[:last_log_id].to_i
191         end
192
193         if ws.filters.length < MAX_FILTERS
194           # Add a filter.  This gets the :filters field which is the same
195           # format as used for regular index queries.
196           ws.filters << filter
197           ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
198
199           # Send any pending events
200           push_events ws, nil
201         else
202           ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
203         end
204
205       elsif p[:method] == 'unsubscribe'
206         # Handle unsubscribe event
207
208         len = ws.filters.length
209         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
210         if ws.filters.length < len
211           ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
212         else
213           ws.send ({status: 404, message: 'filter not found'}.to_json)
214         end
215
216       else
217         ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
218       end
219     rescue => e
220       Rails.logger.warn "Error handling message: #{$!}"
221       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
222       ws.send ({status: 500, message: 'error'}.to_json)
223       ws.close
224     end
225   end
226
227   # Constant maximum number of filters, to avoid silly huge database queries.
228   MAX_FILTERS = 16
229
230   # Called by RackSocket when a new websocket connection has been established.
231   def on_connect ws
232
233     # Disconnect if no valid API token.
234     # current_user is included from CurrentApiClient
235     if not current_user
236       ws.send ({status: 401, message: "Valid API token required"}.to_json)
237       ws.close
238       return
239     end
240
241     # Initialize our custom fields on the websocket connection object.
242     ws.user = current_user
243     ws.filters = []
244     ws.last_log_id = nil
245     ws.sent_ids = Set.new
246     ws.notify_queue = Array.new
247
248     # Subscribe to internal postgres notifications through @channel.  This will
249     # call push_events when a notification comes through.
250     sub = @channel.subscribe do |msg|
251       push_events ws, msg
252     end
253
254     # Set up callback for inbound message dispatch.
255     ws.on :message do |event|
256       handle_message ws, event
257     end
258
259     # Set up socket close callback
260     ws.on :close do |event|
261       @channel.unsubscribe sub
262       ws = nil
263     end
264
265     # Start up thread to monitor the Postgres database, if none exists already.
266     @mtx.synchronize do
267       unless @bgthread
268         @bgthread = true
269         Thread.new do
270           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
271           ActiveRecord::Base.connection_pool.with_connection do |connection|
272             conn = connection.instance_variable_get(:@connection)
273             begin
274               conn.async_exec "LISTEN logs"
275               while true
276                 # wait_for_notify will block until there is a change
277                 # notification from Postgres about the logs table, then push
278                 # the notification into the EventMachine channel.  Each
279                 # websocket connection subscribes to the other end of the
280                 # channel and calls #push_events to actually dispatch the
281                 # events to the client.
282                 conn.wait_for_notify do |channel, pid, payload|
283                   @channel.push payload.to_i
284                 end
285               end
286             ensure
287               # Don't want the connection to still be listening once we return
288               # it to the pool - could result in weird behavior for the next
289               # thread to check it out.
290               conn.async_exec "UNLISTEN *"
291             end
292           end
293           @bgthread = false
294         end
295       end
296     end
297
298     # Since EventMachine is an asynchronous event based dispatcher, #on_connect
299     # does not block but instead returns immediately after having set up the
300     # websocket and notification channel callbacks.
301   end
302 end