5737: Merge branch 'master' into 5737-ruby231
[arvados.git] / services / api / lib / eventbus.rb
1 # If any threads raise an unhandled exception, make them all die.
2 # We trust a supervisor like runit to restart the server in this case.
3 Thread.abort_on_exception = true
4
5 require 'eventmachine'
6 require 'oj'
7 require 'faye/websocket'
8 require 'record_filters'
9 require 'load_param'
10 require 'set'
11 require 'thread'
12
13 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
14 module Faye
15   class WebSocket
16     attr_accessor :user
17     attr_accessor :last_log_id
18     attr_accessor :filters
19     attr_accessor :sent_ids
20     attr_accessor :queue
21     attr_accessor :frame_mtx
22   end
23 end
24
25 module WebSocket
26   class Driver
27
28     class Server
29       alias_method :_write, :write
30
31       def write(data)
32         # Most of the sending activity will be from the thread set up in
33         # on_connect.  However, there is also some automatic activity in the
34         # form of ping/pong messages, so ensure that the write method used to
35         # send one complete message to the underlying socket can only be
36         # called by one thread at a time.
37         self.frame_mtx.synchronize do
38           _write(data)
39         end
40       end
41     end
42   end
43 end
44
45 # Store the filters supplied by the user that will be applied to the logs table
46 # to determine which events to return to the listener.
47 class Filter
48   include LoadParam
49
50   attr_accessor :filters
51
52   def initialize p
53     @params = p
54     load_filters_param
55   end
56
57   def params
58     @params
59   end
60 end
61
62 # Manages websocket connections, accepts subscription messages and publishes
63 # log table events.
64 class EventBus
65   include CurrentApiClient
66   include RecordFilters
67
68   # used in RecordFilters
69   def model_class
70     Log
71   end
72
73   # Initialize EventBus.  Takes no parameters.
74   def initialize
75     @channel = EventMachine::Channel.new
76     @mtx = Mutex.new
77     @bgthread = false
78     @connection_count = 0
79   end
80
81   def send_message(ws, obj)
82     ws.send(Oj.dump(obj, mode: :compat))
83   end
84
85   # Push out any pending events to the connection +ws+
86   # +notify_id+  the id of the most recent row in the log table, may be nil
87   #
88   # This accepts a websocket and a notify_id (this is the row id from Postgres
89   # LISTEN/NOTIFY, it may be nil if called from somewhere else)
90   #
91   # It queries the database for log rows that are either
92   #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
93   #  b) if ws.last_log_id is nil, then it queries the row notify_id
94   #
95   # Regular Arvados permissions are applied using readable_by() and filters using record_filters().
96   def push_events ws, notify_id
97     begin
98       # Must have at least one filter set up to receive events
99       if ws.filters.length > 0
100         # Start with log rows readable by user
101         logs = Log.readable_by(ws.user)
102
103         cond_id = nil
104         cond_out = []
105         param_out = []
106
107         if not ws.last_log_id.nil?
108           # We are catching up from some starting point.
109           cond_id = "logs.id > ?"
110           param_out << ws.last_log_id
111         elsif not notify_id.nil?
112           # Get next row being notified.
113           cond_id = "logs.id = ?"
114           param_out << notify_id
115         else
116           # No log id to start from, nothing to do, return
117           return
118         end
119
120         # Now build filters provided by client
121         ws.filters.each do |filter|
122           ft = record_filters filter.filters, Log
123           if ft[:cond_out].any?
124             # Join the clauses within a single subscription filter with AND
125             # so it is consistent with regular queries
126             cond_out << "(#{ft[:cond_out].join ') AND ('})"
127             param_out += ft[:param_out]
128           end
129         end
130
131         # Add filters to query
132         if cond_out.any?
133           # Join subscriptions with OR
134           logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
135         else
136           logs = logs.where(cond_id, *param_out)
137         end
138
139         # Execute query and actually send the matching log rows. Load
140         # the full log records only when we're ready to send them,
141         # though: otherwise, (1) postgres has to build the whole
142         # result set and return it to us before we can send the first
143         # event, and (2) we store lots of records in memory while
144         # waiting to spool them out to the client. Both of these are
145         # troublesome when log records are large (e.g., a collection
146         # update contains both old and new manifest_text).
147         #
148         # Note: find_each implies order('id asc'), which is what we
149         # want.
150         logs.select('logs.id').find_each do |l|
151           if not ws.sent_ids.include?(l.id)
152             # only send if not a duplicate
153             send_message(ws, Log.find(l.id).as_api_response)
154           end
155           if not ws.last_log_id.nil?
156             # record ids only when sending "catchup" messages, not notifies
157             ws.sent_ids << l.id
158           end
159         end
160         ws.last_log_id = nil
161       end
162     rescue ArgumentError => e
163       # There was some kind of user error.
164       Rails.logger.warn "Error publishing event: #{$!}"
165       send_message(ws, {status: 500, message: $!})
166       ws.close
167     rescue => e
168       Rails.logger.warn "Error publishing event: #{$!}"
169       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
170       send_message(ws, {status: 500, message: $!})
171       ws.close
172       # These exceptions typically indicate serious server trouble:
173       # out of memory issues, database connection problems, etc.  Go ahead and
174       # crash; we expect that a supervisor service like runit will restart us.
175       raise
176     end
177   end
178
179   # Handle inbound subscribe or unsubscribe message.
180   def handle_message ws, event
181     begin
182       begin
183         # Parse event data as JSON
184         p = (Oj.strict_load event.data).symbolize_keys
185         filter = Filter.new(p)
186       rescue Oj::Error => e
187         send_message(ws, {status: 400, message: "malformed request"})
188         return
189       end
190
191       if p[:method] == 'subscribe'
192         # Handle subscribe event
193
194         if p[:last_log_id]
195           # Set or reset the last_log_id.  The event bus only reports events
196           # for rows that come after last_log_id.
197           ws.last_log_id = p[:last_log_id].to_i
198           # Reset sent_ids for consistency
199           # (always re-deliver all matching messages following last_log_id)
200           ws.sent_ids = Set.new
201         end
202
203         if ws.filters.length < Rails.configuration.websocket_max_filters
204           # Add a filter.  This gets the :filters field which is the same
205           # format as used for regular index queries.
206           ws.filters << filter
207           send_message(ws, {status: 200, message: 'subscribe ok', filter: p})
208
209           # Send any pending events
210           push_events ws, nil
211         else
212           send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"})
213         end
214
215       elsif p[:method] == 'unsubscribe'
216         # Handle unsubscribe event
217
218         len = ws.filters.length
219         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
220         if ws.filters.length < len
221           send_message(ws, {status: 200, message: 'unsubscribe ok'})
222         else
223           send_message(ws, {status: 404, message: 'filter not found'})
224         end
225
226       else
227         send_message(ws, {status: 400, message: "missing or unrecognized method"})
228       end
229     rescue => e
230       Rails.logger.warn "Error handling message: #{$!}"
231       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
232       send_message(ws, {status: 500, message: 'error'})
233       ws.close
234     end
235   end
236
237   def overloaded?
238     @mtx.synchronize do
239       @connection_count >= Rails.configuration.websocket_max_connections
240     end
241   end
242
243   # Called by RackSocket when a new websocket connection has been established.
244   def on_connect ws
245     # Disconnect if no valid API token.
246     # current_user is included from CurrentApiClient
247     if not current_user
248       send_message(ws, {status: 401, message: "Valid API token required"})
249       # Wait for the handshake to complete before closing the
250       # socket. Otherwise, nginx responds with HTTP 502 Bad gateway,
251       # and the client never sees our real error message.
252       ws.on :open do |event|
253         ws.close
254       end
255       return
256     end
257
258     # Initialize our custom fields on the websocket connection object.
259     ws.user = current_user
260     ws.filters = []
261     ws.last_log_id = nil
262     ws.sent_ids = Set.new
263     ws.queue = Queue.new
264     ws.frame_mtx = Mutex.new
265
266     @mtx.synchronize do
267       @connection_count += 1
268     end
269
270     # Subscribe to internal postgres notifications through @channel and
271     # forward them to the thread associated with the connection.
272     sub = @channel.subscribe do |msg|
273       if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
274         send_message(ws, {status: 500, message: 'Notify backlog too long'})
275         ws.close
276         @channel.unsubscribe sub
277         ws.queue.clear
278       else
279         ws.queue << [:notify, msg]
280       end
281     end
282
283     # Set up callback for inbound message dispatch.
284     ws.on :message do |event|
285       ws.queue << [:message, event]
286     end
287
288     # Set up socket close callback
289     ws.on :close do |event|
290       @channel.unsubscribe sub
291       ws.queue.clear
292       ws.queue << [:close, nil]
293     end
294
295     # Spin off a new thread to handle sending events to the client.  We need a
296     # separate thread per connection so that a slow client doesn't interfere
297     # with other clients.
298     #
299     # We don't want the loop in the request thread because on a TERM signal,
300     # Puma waits for outstanding requests to complete, and long-lived websocket
301     # connections may not complete in a timely manner.
302     Thread.new do
303       # Loop and react to socket events.
304       begin
305         loop do
306           eventType, msg = ws.queue.pop
307           if eventType == :message
308             handle_message ws, msg
309           elsif eventType == :notify
310             push_events ws, msg
311           elsif eventType == :close
312             break
313           end
314         end
315       ensure
316         @mtx.synchronize do
317           @connection_count -= 1
318         end
319       end
320     end
321
322     # Start up thread to monitor the Postgres database, if none exists already.
323     @mtx.synchronize do
324       unless @bgthread
325         @bgthread = true
326         Thread.new do
327           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
328           ActiveRecord::Base.connection_pool.with_connection do |connection|
329             conn = connection.instance_variable_get(:@connection)
330             begin
331               conn.async_exec "LISTEN logs"
332               while true
333                 # wait_for_notify will block until there is a change
334                 # notification from Postgres about the logs table, then push
335                 # the notification into the EventMachine channel.  Each
336                 # websocket connection subscribes to the other end of the
337                 # channel and calls #push_events to actually dispatch the
338                 # events to the client.
339                 conn.wait_for_notify do |channel, pid, payload|
340                   @channel.push payload.to_i
341                 end
342               end
343             ensure
344               # Don't want the connection to still be listening once we return
345               # it to the pool - could result in weird behavior for the next
346               # thread to check it out.
347               conn.async_exec "UNLISTEN *"
348             end
349           end
350           @bgthread = false
351         end
352       end
353     end
354
355   end
356 end