Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / services / api / lib / eventbus.rb
1 # If any threads raise an unhandled exception, make them all die.
2 # We trust a supervisor like runit to restart the server in this case.
3 Thread.abort_on_exception = true
4
5 require 'eventmachine'
6 require 'oj'
7 require 'faye/websocket'
8 require 'record_filters'
9 require 'load_param'
10 require 'set'
11 require 'thread'
12
13 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
14 module Faye
15   class WebSocket
16     attr_accessor :user
17     attr_accessor :last_log_id
18     attr_accessor :filters
19     attr_accessor :sent_ids
20     attr_accessor :queue
21     attr_accessor :frame_mtx
22   end
23 end
24
25 module WebSocket
26   class Driver
27
28     class Server
29       alias_method :_write, :write
30
31       def write(data)
32         # Most of the sending activity will be from the thread set up in
33         # on_connect.  However, there is also some automatic activity in the
34         # form of ping/pong messages, so ensure that the write method used to
35         # send one complete message to the underlying socket can only be
36         # called by one thread at a time.
37         self.frame_mtx.synchronize do
38           _write(data)
39         end
40       end
41     end
42   end
43 end
44
45 # Store the filters supplied by the user that will be applied to the logs table
46 # to determine which events to return to the listener.
47 class Filter
48   include LoadParam
49
50   attr_accessor :filters
51
52   def initialize p
53     @params = p
54     load_filters_param
55   end
56
57   def params
58     @params
59   end
60 end
61
62 # Manages websocket connections, accepts subscription messages and publishes
63 # log table events.
64 class EventBus
65   include CurrentApiClient
66   include RecordFilters
67
68   # used in RecordFilters
69   def model_class
70     Log
71   end
72
73   # Initialize EventBus.  Takes no parameters.
74   def initialize
75     @channel = EventMachine::Channel.new
76     @mtx = Mutex.new
77     @bgthread = false
78     @connection_count = 0
79   end
80
81   # Push out any pending events to the connection +ws+
82   # +notify_id+  the id of the most recent row in the log table, may be nil
83   #
84   # This accepts a websocket and a notify_id (this is the row id from Postgres
85   # LISTEN/NOTIFY, it may be nil if called from somewhere else)
86   #
87   # It queries the database for log rows that are either
88   #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
89   #  b) if ws.last_log_id is nil, then it queries the row notify_id
90   #
91   # Regular Arvados permissions are applied using readable_by() and filters using record_filters().
92   def push_events ws, notify_id
93     begin
94       # Must have at least one filter set up to receive events
95       if ws.filters.length > 0
96         # Start with log rows readable by user
97         logs = Log.readable_by(ws.user)
98
99         cond_id = nil
100         cond_out = []
101         param_out = []
102
103         if not ws.last_log_id.nil?
104           # We are catching up from some starting point.
105           cond_id = "logs.id > ?"
106           param_out << ws.last_log_id
107         elsif not notify_id.nil?
108           # Get next row being notified.
109           cond_id = "logs.id = ?"
110           param_out << notify_id
111         else
112           # No log id to start from, nothing to do, return
113           return
114         end
115
116         # Now build filters provided by client
117         ws.filters.each do |filter|
118           ft = record_filters filter.filters, Log
119           if ft[:cond_out].any?
120             # Join the clauses within a single subscription filter with AND
121             # so it is consistent with regular queries
122             cond_out << "(#{ft[:cond_out].join ') AND ('})"
123             param_out += ft[:param_out]
124           end
125         end
126
127         # Add filters to query
128         if cond_out.any?
129           # Join subscriptions with OR
130           logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
131         else
132           logs = logs.where(cond_id, *param_out)
133         end
134
135         # Execute query and actually send the matching log rows. Load
136         # the full log records only when we're ready to send them,
137         # though: otherwise, (1) postgres has to build the whole
138         # result set and return it to us before we can send the first
139         # event, and (2) we store lots of records in memory while
140         # waiting to spool them out to the client. Both of these are
141         # troublesome when log records are large (e.g., a collection
142         # update contains both old and new manifest_text).
143         #
144         # Note: find_each implies order('id asc'), which is what we
145         # want.
146         logs.select('logs.id').find_each do |l|
147           if not ws.sent_ids.include?(l.id)
148             # only send if not a duplicate
149             ws.send(Log.find(l.id).as_api_response.to_json)
150           end
151           if not ws.last_log_id.nil?
152             # record ids only when sending "catchup" messages, not notifies
153             ws.sent_ids << l.id
154           end
155         end
156         ws.last_log_id = nil
157       end
158     rescue ArgumentError => e
159       # There was some kind of user error.
160       Rails.logger.warn "Error publishing event: #{$!}"
161       ws.send ({status: 500, message: $!}.to_json)
162       ws.close
163     rescue => e
164       Rails.logger.warn "Error publishing event: #{$!}"
165       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
166       ws.send ({status: 500, message: $!}.to_json)
167       ws.close
168       # These exceptions typically indicate serious server trouble:
169       # out of memory issues, database connection problems, etc.  Go ahead and
170       # crash; we expect that a supervisor service like runit will restart us.
171       raise
172     end
173   end
174
175   # Handle inbound subscribe or unsubscribe message.
176   def handle_message ws, event
177     begin
178       begin
179         # Parse event data as JSON
180         p = (Oj.strict_load event.data).symbolize_keys
181         filter = Filter.new(p)
182       rescue Oj::Error => e
183         ws.send ({status: 400, message: "malformed request"}.to_json)
184         return
185       end
186
187       if p[:method] == 'subscribe'
188         # Handle subscribe event
189
190         if p[:last_log_id]
191           # Set or reset the last_log_id.  The event bus only reports events
192           # for rows that come after last_log_id.
193           ws.last_log_id = p[:last_log_id].to_i
194           # Reset sent_ids for consistency
195           # (always re-deliver all matching messages following last_log_id)
196           ws.sent_ids = Set.new
197         end
198
199         if ws.filters.length < Rails.configuration.websocket_max_filters
200           # Add a filter.  This gets the :filters field which is the same
201           # format as used for regular index queries.
202           ws.filters << filter
203           ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
204
205           # Send any pending events
206           push_events ws, nil
207         else
208           ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
209         end
210
211       elsif p[:method] == 'unsubscribe'
212         # Handle unsubscribe event
213
214         len = ws.filters.length
215         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
216         if ws.filters.length < len
217           ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
218         else
219           ws.send ({status: 404, message: 'filter not found'}.to_json)
220         end
221
222       else
223         ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
224       end
225     rescue => e
226       Rails.logger.warn "Error handling message: #{$!}"
227       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
228       ws.send ({status: 500, message: 'error'}.to_json)
229       ws.close
230     end
231   end
232
233   def overloaded?
234     @mtx.synchronize do
235       @connection_count >= Rails.configuration.websocket_max_connections
236     end
237   end
238
239   # Called by RackSocket when a new websocket connection has been established.
240   def on_connect ws
241     # Disconnect if no valid API token.
242     # current_user is included from CurrentApiClient
243     if not current_user
244       ws.send ({status: 401, message: "Valid API token required"}.to_json)
245       ws.close
246       return
247     end
248
249     # Initialize our custom fields on the websocket connection object.
250     ws.user = current_user
251     ws.filters = []
252     ws.last_log_id = nil
253     ws.sent_ids = Set.new
254     ws.queue = Queue.new
255     ws.frame_mtx = Mutex.new
256
257     @mtx.synchronize do
258       @connection_count += 1
259     end
260
261     # Subscribe to internal postgres notifications through @channel and
262     # forward them to the thread associated with the connection.
263     sub = @channel.subscribe do |msg|
264       if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
265         ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
266         ws.close
267         @channel.unsubscribe sub
268         ws.queue.clear
269       else
270         ws.queue << [:notify, msg]
271       end
272     end
273
274     # Set up callback for inbound message dispatch.
275     ws.on :message do |event|
276       ws.queue << [:message, event]
277     end
278
279     # Set up socket close callback
280     ws.on :close do |event|
281       @channel.unsubscribe sub
282       ws.queue.clear
283       ws.queue << [:close, nil]
284     end
285
286     # Spin off a new thread to handle sending events to the client.  We need a
287     # separate thread per connection so that a slow client doesn't interfere
288     # with other clients.
289     #
290     # We don't want the loop in the request thread because on a TERM signal,
291     # Puma waits for outstanding requests to complete, and long-lived websocket
292     # connections may not complete in a timely manner.
293     Thread.new do
294       # Loop and react to socket events.
295       begin
296         loop do
297           eventType, msg = ws.queue.pop
298           if eventType == :message
299             handle_message ws, msg
300           elsif eventType == :notify
301             push_events ws, msg
302           elsif eventType == :close
303             break
304           end
305         end
306       ensure
307         @mtx.synchronize do
308           @connection_count -= 1
309         end
310       end
311     end
312
313     # Start up thread to monitor the Postgres database, if none exists already.
314     @mtx.synchronize do
315       unless @bgthread
316         @bgthread = true
317         Thread.new do
318           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
319           ActiveRecord::Base.connection_pool.with_connection do |connection|
320             conn = connection.instance_variable_get(:@connection)
321             begin
322               conn.async_exec "LISTEN logs"
323               while true
324                 # wait_for_notify will block until there is a change
325                 # notification from Postgres about the logs table, then push
326                 # the notification into the EventMachine channel.  Each
327                 # websocket connection subscribes to the other end of the
328                 # channel and calls #push_events to actually dispatch the
329                 # events to the client.
330                 conn.wait_for_notify do |channel, pid, payload|
331                   @channel.push payload.to_i
332                 end
333               end
334             ensure
335               # Don't want the connection to still be listening once we return
336               # it to the pool - could result in weird behavior for the next
337               # thread to check it out.
338               conn.async_exec "UNLISTEN *"
339             end
340           end
341           @bgthread = false
342         end
343       end
344     end
345
346   end
347 end