aaeebdccf0cd52e8d0fb38aa38ce9d2bf436b626
[arvados.git] / services / api / lib / eventbus.rb
1 # If any threads raise an unhandled exception, make them all die.
2 # We trust a supervisor like runit to restart the server in this case.
3 Thread.abort_on_exception = true
4
5 require 'eventmachine'
6 require 'oj'
7 require 'faye/websocket'
8 require 'record_filters'
9 require 'load_param'
10 require 'set'
11 require 'thread'
12
13 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
14 module Faye
15   class WebSocket
16     attr_accessor :user
17     attr_accessor :last_log_id
18     attr_accessor :filters
19     attr_accessor :sent_ids
20     attr_accessor :queue
21     attr_accessor :frame_mtx
22   end
23 end
24
25 module WebSocket
26   class Driver
27
28     class Server
29       alias_method :_write, :write
30
31       def write(data)
32         # Most of the sending activity will be from the thread set up in
33         # on_connect.  However, there is also some automatic activity in the
34         # form of ping/pong messages, so ensure that the write method used to
35         # send one complete message to the underlying socket can only be
36         # called by one thread at a time.
37         self.frame_mtx.synchronize do
38           _write(data)
39         end
40       end
41     end
42   end
43 end
44
45 # Store the filters supplied by the user that will be applied to the logs table
46 # to determine which events to return to the listener.
47 class Filter
48   include LoadParam
49
50   attr_accessor :filters
51
52   def initialize p
53     @params = p
54     load_filters_param
55   end
56
57   def params
58     @params
59   end
60 end
61
62 # Manages websocket connections, accepts subscription messages and publishes
63 # log table events.
64 class EventBus
65   include CurrentApiClient
66   include RecordFilters
67
68   # used in RecordFilters
69   def model_class
70     Log
71   end
72
73   # Initialize EventBus.  Takes no parameters.
74   def initialize
75     @channel = EventMachine::Channel.new
76     @mtx = Mutex.new
77     @bgthread = false
78     @connection_count = 0
79   end
80
81   # Push out any pending events to the connection +ws+
82   # +notify_id+  the id of the most recent row in the log table, may be nil
83   #
84   # This accepts a websocket and a notify_id (this is the row id from Postgres
85   # LISTEN/NOTIFY, it may be nil if called from somewhere else)
86   #
87   # It queries the database for log rows that are either
88   #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
89   #  b) if ws.last_log_id is nil, then it queries the row notify_id
90   #
91   # Regular Arvados permissions are applied using readable_by() and filters using record_filters().
92   def push_events ws, notify_id
93     begin
94       # Must have at least one filter set up to receive events
95       if ws.filters.length > 0
96         # Start with log rows readable by user, sorted in ascending order
97         logs = Log.readable_by(ws.user).order("id asc")
98
99         cond_id = nil
100         cond_out = []
101         param_out = []
102
103         if not ws.last_log_id.nil?
104           # We are catching up from some starting point.
105           cond_id = "logs.id > ?"
106           param_out << ws.last_log_id
107         elsif not notify_id.nil?
108           # Get next row being notified.
109           cond_id = "logs.id = ?"
110           param_out << notify_id
111         else
112           # No log id to start from, nothing to do, return
113           return
114         end
115
116         # Now build filters provided by client
117         ws.filters.each do |filter|
118           ft = record_filters filter.filters, Log
119           if ft[:cond_out].any?
120             # Join the clauses within a single subscription filter with AND
121             # so it is consistent with regular queries
122             cond_out << "(#{ft[:cond_out].join ') AND ('})"
123             param_out += ft[:param_out]
124           end
125         end
126
127         # Add filters to query
128         if cond_out.any?
129           # Join subscriptions with OR
130           logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
131         else
132           logs = logs.where(cond_id, *param_out)
133         end
134
135         # Execute query and actually send the matching log rows
136         logs.each do |l|
137           if not ws.sent_ids.include?(l.id)
138             # only send if not a duplicate
139             ws.send(l.as_api_response.to_json)
140           end
141           if not ws.last_log_id.nil?
142             # record ids only when sending "catchup" messages, not notifies
143             ws.sent_ids << l.id
144           end
145         end
146         ws.last_log_id = nil
147       end
148     rescue ArgumentError => e
149       # There was some kind of user error.
150       Rails.logger.warn "Error publishing event: #{$!}"
151       ws.send ({status: 500, message: $!}.to_json)
152       ws.close
153     rescue => e
154       Rails.logger.warn "Error publishing event: #{$!}"
155       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
156       ws.send ({status: 500, message: $!}.to_json)
157       ws.close
158       # These exceptions typically indicate serious server trouble:
159       # out of memory issues, database connection problems, etc.  Go ahead and
160       # crash; we expect that a supervisor service like runit will restart us.
161       raise
162     end
163   end
164
165   # Handle inbound subscribe or unsubscribe message.
166   def handle_message ws, event
167     begin
168       begin
169         # Parse event data as JSON
170         p = (Oj.strict_load event.data).symbolize_keys
171         filter = Filter.new(p)
172       rescue Oj::Error => e
173         ws.send ({status: 400, message: "malformed request"}.to_json)
174         return
175       end
176
177       if p[:method] == 'subscribe'
178         # Handle subscribe event
179
180         if p[:last_log_id]
181           # Set or reset the last_log_id.  The event bus only reports events
182           # for rows that come after last_log_id.
183           ws.last_log_id = p[:last_log_id].to_i
184           # Reset sent_ids for consistency
185           # (always re-deliver all matching messages following last_log_id)
186           ws.sent_ids = Set.new
187         end
188
189         if ws.filters.length < Rails.configuration.websocket_max_filters
190           # Add a filter.  This gets the :filters field which is the same
191           # format as used for regular index queries.
192           ws.filters << filter
193           ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
194
195           # Send any pending events
196           push_events ws, nil
197         else
198           ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
199         end
200
201       elsif p[:method] == 'unsubscribe'
202         # Handle unsubscribe event
203
204         len = ws.filters.length
205         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
206         if ws.filters.length < len
207           ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
208         else
209           ws.send ({status: 404, message: 'filter not found'}.to_json)
210         end
211
212       else
213         ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
214       end
215     rescue => e
216       Rails.logger.warn "Error handling message: #{$!}"
217       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
218       ws.send ({status: 500, message: 'error'}.to_json)
219       ws.close
220     end
221   end
222
223   def overloaded?
224     @mtx.synchronize do
225       @connection_count >= Rails.configuration.websocket_max_connections
226     end
227   end
228
229   # Called by RackSocket when a new websocket connection has been established.
230   def on_connect ws
231     # Disconnect if no valid API token.
232     # current_user is included from CurrentApiClient
233     if not current_user
234       ws.send ({status: 401, message: "Valid API token required"}.to_json)
235       ws.close
236       return
237     end
238
239     # Initialize our custom fields on the websocket connection object.
240     ws.user = current_user
241     ws.filters = []
242     ws.last_log_id = nil
243     ws.sent_ids = Set.new
244     ws.queue = Queue.new
245     ws.frame_mtx = Mutex.new
246
247     @mtx.synchronize do
248       @connection_count += 1
249     end
250
251     # Subscribe to internal postgres notifications through @channel and
252     # forward them to the thread associated with the connection.
253     sub = @channel.subscribe do |msg|
254       if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
255         ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
256         ws.close
257         @channel.unsubscribe sub
258         ws.queue.clear
259       else
260         ws.queue << [:notify, msg]
261       end
262     end
263
264     # Set up callback for inbound message dispatch.
265     ws.on :message do |event|
266       ws.queue << [:message, event]
267     end
268
269     # Set up socket close callback
270     ws.on :close do |event|
271       @channel.unsubscribe sub
272       ws.queue.clear
273       ws.queue << [:close, nil]
274     end
275
276     # Spin off a new thread to handle sending events to the client.  We need a
277     # separate thread per connection so that a slow client doesn't interfere
278     # with other clients.
279     #
280     # We don't want the loop in the request thread because on a TERM signal,
281     # Puma waits for outstanding requests to complete, and long-lived websocket
282     # connections may not complete in a timely manner.
283     Thread.new do
284       # Loop and react to socket events.
285       begin
286         loop do
287           eventType, msg = ws.queue.pop
288           if eventType == :message
289             handle_message ws, msg
290           elsif eventType == :notify
291             push_events ws, msg
292           elsif eventType == :close
293             break
294           end
295         end
296       ensure
297         @mtx.synchronize do
298           @connection_count -= 1
299         end
300       end
301     end
302
303     # Start up thread to monitor the Postgres database, if none exists already.
304     @mtx.synchronize do
305       unless @bgthread
306         @bgthread = true
307         Thread.new do
308           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
309           ActiveRecord::Base.connection_pool.with_connection do |connection|
310             conn = connection.instance_variable_get(:@connection)
311             begin
312               conn.async_exec "LISTEN logs"
313               while true
314                 # wait_for_notify will block until there is a change
315                 # notification from Postgres about the logs table, then push
316                 # the notification into the EventMachine channel.  Each
317                 # websocket connection subscribes to the other end of the
318                 # channel and calls #push_events to actually dispatch the
319                 # events to the client.
320                 conn.wait_for_notify do |channel, pid, payload|
321                   @channel.push payload.to_i
322                 end
323               end
324             ensure
325               # Don't want the connection to still be listening once we return
326               # it to the pool - could result in weird behavior for the next
327               # thread to check it out.
328               conn.async_exec "UNLISTEN *"
329             end
330           end
331           @bgthread = false
332         end
333       end
334     end
335
336   end
337 end