9427: Add connection limit.
[arvados.git] / services / api / lib / eventbus.rb
1 # If any threads raise an unhandled exception, make them all die.
2 # We trust a supervisor like runit to restart the server in this case.
3 Thread.abort_on_exception = true
4
5 require 'eventmachine'
6 require 'oj'
7 require 'faye/websocket'
8 require 'record_filters'
9 require 'load_param'
10 require 'set'
11 require 'thread'
12
13 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
14 module Faye
15   class WebSocket
16     attr_accessor :user
17     attr_accessor :last_log_id
18     attr_accessor :filters
19     attr_accessor :sent_ids
20     attr_accessor :queue
21     attr_accessor :frame_mtx
22   end
23 end
24
25 module WebSocket
26   class Driver
27
28     class Hybi < Driver
29       alias_method :_frame, :frame
30
31       def frame(data, type = nil, code = nil)
32         # Most of the sending activity will be from the thread set up in
33         # on_connect.  However, there is also some automatic activity in the
34         # form of ping/pong messages, so ensure that the frame method (which
35         # actually packs and sends one complete message to the underlying
36         # socket) can only be called by one thread at a time.
37         @socket.frame_mtx.synchronize do
38           _frame(data, type, code)
39         end
40       end
41     end
42   end
43 end
44
45 # Store the filters supplied by the user that will be applied to the logs table
46 # to determine which events to return to the listener.
47 class Filter
48   include LoadParam
49
50   attr_accessor :filters
51
52   def initialize p
53     @params = p
54     load_filters_param
55   end
56
57   def params
58     @params
59   end
60 end
61
62 # Manages websocket connections, accepts subscription messages and publishes
63 # log table events.
64 class EventBus
65   include CurrentApiClient
66   include RecordFilters
67
68   # used in RecordFilters
69   def model_class
70     Log
71   end
72
73   # Initialize EventBus.  Takes no parameters.
74   def initialize
75     @channel = EventMachine::Channel.new
76     @mtx = Mutex.new
77     @bgthread = false
78     @connection_count = 0
79   end
80
81   # Push out any pending events to the connection +ws+
82   # +notify_id+  the id of the most recent row in the log table, may be nil
83   #
84   # This accepts a websocket and a notify_id (this is the row id from Postgres
85   # LISTEN/NOTIFY, it may be nil if called from somewhere else)
86   #
87   # It queries the database for log rows that are either
88   #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
89   #  b) if ws.last_log_id is nil, then it queries the row notify_id
90   #
91   # Regular Arvados permissions are applied using readable_by() and filters using record_filters().
92   def push_events ws, notify_id
93     begin
94       # Must have at least one filter set up to receive events
95       if ws.filters.length > 0
96         # Start with log rows readable by user, sorted in ascending order
97         logs = Log.readable_by(ws.user).order("id asc")
98
99         cond_id = nil
100         cond_out = []
101         param_out = []
102
103         if not ws.last_log_id.nil?
104           # We are catching up from some starting point.
105           cond_id = "logs.id > ?"
106           param_out << ws.last_log_id
107         elsif not notify_id.nil?
108           # Get next row being notified.
109           cond_id = "logs.id = ?"
110           param_out << notify_id
111         else
112           # No log id to start from, nothing to do, return
113           return
114         end
115
116         # Now build filters provided by client
117         ws.filters.each do |filter|
118           ft = record_filters filter.filters, Log
119           if ft[:cond_out].any?
120             # Join the clauses within a single subscription filter with AND
121             # so it is consistent with regular queries
122             cond_out << "(#{ft[:cond_out].join ') AND ('})"
123             param_out += ft[:param_out]
124           end
125         end
126
127         # Add filters to query
128         if cond_out.any?
129           # Join subscriptions with OR
130           logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
131         else
132           logs = logs.where(cond_id, *param_out)
133         end
134
135         # Execute query and actually send the matching log rows
136         logs.each do |l|
137           if not ws.sent_ids.include?(l.id)
138             # only send if not a duplicate
139             ws.send(l.as_api_response.to_json)
140           end
141           if not ws.last_log_id.nil?
142             # record ids only when sending "catchup" messages, not notifies
143             ws.sent_ids << l.id
144           end
145         end
146         ws.last_log_id = nil
147       end
148     rescue ArgumentError => e
149       # There was some kind of user error.
150       Rails.logger.warn "Error publishing event: #{$!}"
151       ws.send ({status: 500, message: $!}.to_json)
152       ws.close
153     rescue => e
154       Rails.logger.warn "Error publishing event: #{$!}"
155       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
156       ws.send ({status: 500, message: $!}.to_json)
157       ws.close
158       # These exceptions typically indicate serious server trouble:
159       # out of memory issues, database connection problems, etc.  Go ahead and
160       # crash; we expect that a supervisor service like runit will restart us.
161       raise
162     end
163   end
164
165   # Handle inbound subscribe or unsubscribe message.
166   def handle_message ws, event
167     begin
168       begin
169         # Parse event data as JSON
170         p = (Oj.strict_load event.data).symbolize_keys
171         filter = Filter.new(p)
172       rescue Oj::Error => e
173         ws.send ({status: 400, message: "malformed request"}.to_json)
174         return
175       end
176
177       if p[:method] == 'subscribe'
178         # Handle subscribe event
179
180         if p[:last_log_id]
181           # Set or reset the last_log_id.  The event bus only reports events
182           # for rows that come after last_log_id.
183           ws.last_log_id = p[:last_log_id].to_i
184           # Reset sent_ids for consistency
185           # (always re-deliver all matching messages following last_log_id)
186           ws.sent_ids = Set.new
187         end
188
189         if ws.filters.length < MAX_FILTERS
190           # Add a filter.  This gets the :filters field which is the same
191           # format as used for regular index queries.
192           ws.filters << filter
193           ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
194
195           # Send any pending events
196           push_events ws, nil
197         else
198           ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
199         end
200
201       elsif p[:method] == 'unsubscribe'
202         # Handle unsubscribe event
203
204         len = ws.filters.length
205         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
206         if ws.filters.length < len
207           ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
208         else
209           ws.send ({status: 404, message: 'filter not found'}.to_json)
210         end
211
212       else
213         ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
214       end
215     rescue => e
216       Rails.logger.warn "Error handling message: #{$!}"
217       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
218       ws.send ({status: 500, message: 'error'}.to_json)
219       ws.close
220     end
221   end
222
223   # Constant maximum number of filters, to avoid silly huge database queries.
224   MAX_FILTERS = 16
225   MAX_NOTIFY_BACKLOG = 1000
226   MAX_CONNECTIONS = 500
227
228   def overloaded?
229     @mtx.synchronize do
230       @connection_count >= MAX_CONNECTIONS
231     end
232   end
233
234   # Called by RackSocket when a new websocket connection has been established.
235   def on_connect ws
236     # Disconnect if no valid API token.
237     # current_user is included from CurrentApiClient
238     if not current_user
239       ws.send ({status: 401, message: "Valid API token required"}.to_json)
240       ws.close
241       return
242     end
243
244     # Initialize our custom fields on the websocket connection object.
245     ws.user = current_user
246     ws.filters = []
247     ws.last_log_id = nil
248     ws.sent_ids = Set.new
249     ws.queue = Queue.new
250     ws.frame_mtx = Mutex.new
251
252     @mtx.synchronize do
253       @connection_count += 1
254     end
255
256     # Subscribe to internal postgres notifications through @channel and
257     # forward them to the thread associated with the connection.
258     sub = @channel.subscribe do |msg|
259       if msg != :term
260         ws.queue << [:notify, msg]
261       else
262         ws.close
263       end
264     end
265
266     # Set up callback for inbound message dispatch.
267     ws.on :message do |event|
268       ws.queue << [:message, event]
269     end
270
271     # Set up socket close callback
272     ws.on :close do |event|
273       @channel.unsubscribe sub
274       ws.queue.clear
275       ws.queue << [:close, nil]
276     end
277
278     # Spin off a new thread to handle sending events to the client.  We need a
279     # separate thread per connection so that a slow client doesn't interfere
280     # with other clients.
281     #
282     # We don't want the loop in the request thread because on a TERM signal,
283     # Puma waits for outstanding requests to complete, and long-lived websocket
284     # connections may not complete in a timely manner.
285     Thread.new do
286       # Loop and react to socket events.
287       loop do
288         eventType, msg = ws.queue.pop
289         if ws.queue.length > MAX_NOTIFY_BACKLOG
290           ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
291           ws.close
292         else
293           if eventType == :message
294             handle_message ws, msg
295           elsif eventType == :notify
296             push_events ws, msg
297           elsif eventType == :close
298             break
299           end
300         end
301       end
302       @mtx.synchronize do
303         @connection_count -= 1
304       end
305     end
306
307     # Start up thread to monitor the Postgres database, if none exists already.
308     @mtx.synchronize do
309       unless @bgthread
310         @bgthread = true
311         Thread.new do
312           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
313           ActiveRecord::Base.connection_pool.with_connection do |connection|
314             conn = connection.instance_variable_get(:@connection)
315             begin
316               conn.async_exec "LISTEN logs"
317               while true
318                 # wait_for_notify will block until there is a change
319                 # notification from Postgres about the logs table, then push
320                 # the notification into the EventMachine channel.  Each
321                 # websocket connection subscribes to the other end of the
322                 # channel and calls #push_events to actually dispatch the
323                 # events to the client.
324                 conn.wait_for_notify do |channel, pid, payload|
325                   @channel.push payload.to_i
326                 end
327               end
328             ensure
329               # Don't want the connection to still be listening once we return
330               # it to the pool - could result in weird behavior for the next
331               # thread to check it out.
332               conn.async_exec "UNLISTEN *"
333             end
334           end
335           @bgthread = false
336         end
337       end
338     end
339
340   end
341 end