- Added object_owner_uuid to logs table, which records the owner of the object
[arvados.git] / services / api / lib / eventbus.rb
1 require 'eventmachine'
2 require 'oj'
3 require 'faye/websocket'
4 require 'record_filters'
5 require 'load_param'
6
7 module Faye
8   class WebSocket
9     attr_accessor :user
10     attr_accessor :last_log_id
11     attr_accessor :filters
12   end
13 end
14
15 class Filter
16   include LoadParam
17
18   def initialize p
19     @p = p
20     load_filters_param
21   end
22
23   def params
24     @p
25   end
26
27   def filters
28     @filters
29   end
30 end
31
32 class EventBus
33   include CurrentApiClient
34   include RecordFilters
35
36   def initialize
37     @channel = EventMachine::Channel.new
38     @mtx = Mutex.new
39     @bgthread = false
40   end
41
42   def on_connect ws
43     if not current_user
44       ws.send '{"error":"Not logged in"}'
45       ws.close
46       return
47     end
48
49     ws.user = current_user
50     ws.filters = []
51     ws.last_log_id = nil
52
53     sub = @channel.subscribe do |msg|
54       # Must have at least one filter set up to receive events
55       if ws.filters.length > 0
56
57         # Start with log rows readable by user, sorted in ascending order
58         logs = Log.readable_by(ws.user).order("id asc")
59
60         if ws.last_log_id
61           # Only get log rows that are new
62           logs = logs.where("log.id > ? and log.id <= ?", ws.last_log_id, msg.to_i)
63         else
64           # No last log id, so only look at the most recently changed row
65           logs = logs.where("log.id = ?", msg.to_i)
66         end
67
68         # Record the most recent row
69         ws.last_log_id = msg.to_i
70
71         # Now process filters provided by client
72         cond_out = []
73         param_out = []
74         ws.filters.each do |filter|
75           ft = record_filters filter.filters
76           cond_out += ft[:cond_out]
77           param_out += ft[:param_out]
78         end
79
80         # Add filters to query
81         if cond_out.any?
82           logs = logs.where(cond_out.join(' OR '), *param_out)
83         end
84
85         # Finally execute query and send matching rows
86         logs.each do |l|
87           ws.send(l.as_api_response.to_json)
88         end
89       else
90         # No filters set up, so just record the sequence number
91         ws.last_log_id.nil = msg.to_i
92       end
93     end
94
95     ws.on :message do |event|
96       p = oj.parse(event.data)
97       if p[:method] == 'subscribe'
98         if p[:starting_log_id]
99           ws.last_log_id = p[:starting_log_id].to_i
100         end
101         ws.filters.push(Filter.new p)
102       end
103     end
104
105     ws.on :close do |event|
106       @channel.unsubscribe sub
107       ws = nil
108     end
109
110     @mtx.synchronize do
111       unless @bgthread
112         @bgthread = true
113         Thread.new do
114           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
115           ActiveRecord::Base.connection_pool.with_connection do |connection|
116             conn = connection.instance_variable_get(:@connection)
117             begin
118               conn.async_exec "LISTEN logs"
119               while true
120                 conn.wait_for_notify do |channel, pid, payload|
121                   @channel.push payload
122                 end
123               end
124             ensure
125               # Don't want the connection to still be listening once we return
126               # it to the pool - could result in weird behavior for the next
127               # thread to check it out.
128               conn.async_exec "UNLISTEN *"
129             end
130           end
131         end
132       end
133     end
134   end
135 end