8019: Rework partial line throttling. Fix sending flush when buffer is ready
[arvados.git] / services / api / lib / eventbus.rb
1 # If any threads raise an unhandled exception, make them all die.
2 # We trust a supervisor like runit to restart the server in this case.
3 Thread.abort_on_exception = true
4
5 require 'eventmachine'
6 require 'faye/websocket'
7 require 'load_param'
8 require 'oj'
9 require 'record_filters'
10 require 'safe_json'
11 require 'set'
12 require 'thread'
13
14 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
15 module Faye
16   class WebSocket
17     attr_accessor :user
18     attr_accessor :last_log_id
19     attr_accessor :filters
20     attr_accessor :sent_ids
21     attr_accessor :queue
22     attr_accessor :frame_mtx
23   end
24 end
25
26 module WebSocket
27   class Driver
28
29     class Server
30       alias_method :_write, :write
31
32       def write(data)
33         # Most of the sending activity will be from the thread set up in
34         # on_connect.  However, there is also some automatic activity in the
35         # form of ping/pong messages, so ensure that the write method used to
36         # send one complete message to the underlying socket can only be
37         # called by one thread at a time.
38         self.frame_mtx.synchronize do
39           _write(data)
40         end
41       end
42     end
43   end
44 end
45
46 # Store the filters supplied by the user that will be applied to the logs table
47 # to determine which events to return to the listener.
48 class Filter
49   include LoadParam
50
51   attr_accessor :filters
52
53   def initialize p
54     @params = p
55     load_filters_param
56   end
57
58   def params
59     @params
60   end
61 end
62
63 # Manages websocket connections, accepts subscription messages and publishes
64 # log table events.
65 class EventBus
66   include CurrentApiClient
67   include RecordFilters
68
69   # used in RecordFilters
70   def model_class
71     Log
72   end
73
74   # Initialize EventBus.  Takes no parameters.
75   def initialize
76     @channel = EventMachine::Channel.new
77     @mtx = Mutex.new
78     @bgthread = false
79     @connection_count = 0
80   end
81
82   def send_message(ws, obj)
83     ws.send(SafeJSON.dump(obj))
84   end
85
86   # Push out any pending events to the connection +ws+
87   # +notify_id+  the id of the most recent row in the log table, may be nil
88   #
89   # This accepts a websocket and a notify_id (this is the row id from Postgres
90   # LISTEN/NOTIFY, it may be nil if called from somewhere else)
91   #
92   # It queries the database for log rows that are either
93   #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
94   #  b) if ws.last_log_id is nil, then it queries the row notify_id
95   #
96   # Regular Arvados permissions are applied using readable_by() and filters using record_filters().
97   def push_events ws, notify_id
98     begin
99       # Must have at least one filter set up to receive events
100       if ws.filters.length > 0
101         # Start with log rows readable by user
102         logs = Log.readable_by(ws.user)
103
104         cond_id = nil
105         cond_out = []
106         param_out = []
107
108         if not ws.last_log_id.nil?
109           # We are catching up from some starting point.
110           cond_id = "logs.id > ?"
111           param_out << ws.last_log_id
112         elsif not notify_id.nil?
113           # Get next row being notified.
114           cond_id = "logs.id = ?"
115           param_out << notify_id
116         else
117           # No log id to start from, nothing to do, return
118           return
119         end
120
121         # Now build filters provided by client
122         ws.filters.each do |filter|
123           ft = record_filters filter.filters, Log
124           if ft[:cond_out].any?
125             # Join the clauses within a single subscription filter with AND
126             # so it is consistent with regular queries
127             cond_out << "(#{ft[:cond_out].join ') AND ('})"
128             param_out += ft[:param_out]
129           end
130         end
131
132         # Add filters to query
133         if cond_out.any?
134           # Join subscriptions with OR
135           logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
136         else
137           logs = logs.where(cond_id, *param_out)
138         end
139
140         # Execute query and actually send the matching log rows. Load
141         # the full log records only when we're ready to send them,
142         # though: otherwise, (1) postgres has to build the whole
143         # result set and return it to us before we can send the first
144         # event, and (2) we store lots of records in memory while
145         # waiting to spool them out to the client. Both of these are
146         # troublesome when log records are large (e.g., a collection
147         # update contains both old and new manifest_text).
148         #
149         # Note: find_each implies order('id asc'), which is what we
150         # want.
151         logs.select('logs.id').find_each do |l|
152           if not ws.sent_ids.include?(l.id)
153             # only send if not a duplicate
154             send_message(ws, Log.find(l.id).as_api_response)
155           end
156           if not ws.last_log_id.nil?
157             # record ids only when sending "catchup" messages, not notifies
158             ws.sent_ids << l.id
159           end
160         end
161         ws.last_log_id = nil
162       end
163     rescue ArgumentError => e
164       # There was some kind of user error.
165       Rails.logger.warn "Error publishing event: #{$!}"
166       send_message(ws, {status: 500, message: $!})
167       ws.close
168     rescue => e
169       Rails.logger.warn "Error publishing event: #{$!}"
170       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
171       send_message(ws, {status: 500, message: $!})
172       ws.close
173       # These exceptions typically indicate serious server trouble:
174       # out of memory issues, database connection problems, etc.  Go ahead and
175       # crash; we expect that a supervisor service like runit will restart us.
176       raise
177     end
178   end
179
180   # Handle inbound subscribe or unsubscribe message.
181   def handle_message ws, event
182     begin
183       begin
184         # Parse event data as JSON
185         p = SafeJSON.load(event.data).symbolize_keys
186         filter = Filter.new(p)
187       rescue Oj::Error => e
188         send_message(ws, {status: 400, message: "malformed request"})
189         return
190       end
191
192       if p[:method] == 'subscribe'
193         # Handle subscribe event
194
195         if p[:last_log_id]
196           # Set or reset the last_log_id.  The event bus only reports events
197           # for rows that come after last_log_id.
198           ws.last_log_id = p[:last_log_id].to_i
199           # Reset sent_ids for consistency
200           # (always re-deliver all matching messages following last_log_id)
201           ws.sent_ids = Set.new
202         end
203
204         if ws.filters.length < Rails.configuration.websocket_max_filters
205           # Add a filter.  This gets the :filters field which is the same
206           # format as used for regular index queries.
207           ws.filters << filter
208           send_message(ws, {status: 200, message: 'subscribe ok', filter: p})
209
210           # Send any pending events
211           push_events ws, nil
212         else
213           send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"})
214         end
215
216       elsif p[:method] == 'unsubscribe'
217         # Handle unsubscribe event
218
219         len = ws.filters.length
220         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
221         if ws.filters.length < len
222           send_message(ws, {status: 200, message: 'unsubscribe ok'})
223         else
224           send_message(ws, {status: 404, message: 'filter not found'})
225         end
226
227       else
228         send_message(ws, {status: 400, message: "missing or unrecognized method"})
229       end
230     rescue => e
231       Rails.logger.warn "Error handling message: #{$!}"
232       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
233       send_message(ws, {status: 500, message: 'error'})
234       ws.close
235     end
236   end
237
238   def overloaded?
239     @mtx.synchronize do
240       @connection_count >= Rails.configuration.websocket_max_connections
241     end
242   end
243
244   # Called by RackSocket when a new websocket connection has been established.
245   def on_connect ws
246     # Disconnect if no valid API token.
247     # current_user is included from CurrentApiClient
248     if not current_user
249       send_message(ws, {status: 401, message: "Valid API token required"})
250       # Wait for the handshake to complete before closing the
251       # socket. Otherwise, nginx responds with HTTP 502 Bad gateway,
252       # and the client never sees our real error message.
253       ws.on :open do |event|
254         ws.close
255       end
256       return
257     end
258
259     # Initialize our custom fields on the websocket connection object.
260     ws.user = current_user
261     ws.filters = []
262     ws.last_log_id = nil
263     ws.sent_ids = Set.new
264     ws.queue = Queue.new
265     ws.frame_mtx = Mutex.new
266
267     @mtx.synchronize do
268       @connection_count += 1
269     end
270
271     # Subscribe to internal postgres notifications through @channel and
272     # forward them to the thread associated with the connection.
273     sub = @channel.subscribe do |msg|
274       if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
275         send_message(ws, {status: 500, message: 'Notify backlog too long'})
276         ws.close
277         @channel.unsubscribe sub
278         ws.queue.clear
279       else
280         ws.queue << [:notify, msg]
281       end
282     end
283
284     # Set up callback for inbound message dispatch.
285     ws.on :message do |event|
286       ws.queue << [:message, event]
287     end
288
289     # Set up socket close callback
290     ws.on :close do |event|
291       @channel.unsubscribe sub
292       ws.queue.clear
293       ws.queue << [:close, nil]
294     end
295
296     # Spin off a new thread to handle sending events to the client.  We need a
297     # separate thread per connection so that a slow client doesn't interfere
298     # with other clients.
299     #
300     # We don't want the loop in the request thread because on a TERM signal,
301     # Puma waits for outstanding requests to complete, and long-lived websocket
302     # connections may not complete in a timely manner.
303     Thread.new do
304       # Loop and react to socket events.
305       begin
306         loop do
307           eventType, msg = ws.queue.pop
308           if eventType == :message
309             handle_message ws, msg
310           elsif eventType == :notify
311             push_events ws, msg
312           elsif eventType == :close
313             break
314           end
315         end
316       ensure
317         @mtx.synchronize do
318           @connection_count -= 1
319         end
320         ActiveRecord::Base.connection.close
321       end
322     end
323
324     # Start up thread to monitor the Postgres database, if none exists already.
325     @mtx.synchronize do
326       unless @bgthread
327         @bgthread = true
328         Thread.new do
329           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
330           ActiveRecord::Base.connection_pool.with_connection do |connection|
331             conn = connection.instance_variable_get(:@connection)
332             begin
333               conn.async_exec "LISTEN logs"
334               while true
335                 # wait_for_notify will block until there is a change
336                 # notification from Postgres about the logs table, then push
337                 # the notification into the EventMachine channel.  Each
338                 # websocket connection subscribes to the other end of the
339                 # channel and calls #push_events to actually dispatch the
340                 # events to the client.
341                 conn.wait_for_notify do |channel, pid, payload|
342                   @channel.push payload.to_i
343                 end
344               end
345             ensure
346               # Don't want the connection to still be listening once we return
347               # it to the pool - could result in weird behavior for the next
348               # thread to check it out.
349               conn.async_exec "UNLISTEN *"
350             end
351           end
352           @bgthread = false
353         end
354       end
355     end
356
357   end
358 end