Moved application-specific code out from rack_socket to lib/eventbus.rb
[arvados.git] / services / api / lib / eventbus.rb
1 require 'eventmachine'
2 require 'oj'
3 require 'faye/websocket'
4
5 class EventBus
6   def initialize
7     @channel = EventMachine::Channel.new
8     @bgthread = nil
9   end
10
11   def on_connect ws
12       sub = @channel.subscribe do |msg|
13         puts "sending #{msg}"
14         ws.send({:message => "log"}.to_json)
15       end
16
17       ws.on :message do |event|
18         puts "got #{event.data}"
19         ws.send(event.data)
20       end
21
22       ws.on :close do |event|
23         p [:close, event.code, event.reason]
24         @channel.unsubscribe sub
25         ws = nil
26       end
27
28       unless @bgthread
29         @bgthread = true
30         Thread.new do
31           # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
32           ActiveRecord::Base.connection_pool.with_connection do |connection|
33             conn = connection.instance_variable_get(:@connection)
34             begin
35               conn.async_exec "LISTEN logs"
36               while true
37                 conn.wait_for_notify do |channel, pid, payload|
38                   puts "Received a NOTIFY on channel #{channel}"
39                   puts "from PG backend #{pid}"
40                   puts "saying #{payload}"
41                   @channel.push true
42                 end
43               end
44             ensure
45               # Don't want the connection to still be listening once we return
46               # it to the pool - could result in weird behavior for the next
47               # thread to check it out.
48               conn.async_exec "UNLISTEN *"
49             end
50           end
51         end
52       end
53
54   end
55 end