Added code to properly initialize EventMachine to deal with Passenger forking.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Apr 2014 20:53:42 +0000 (16:53 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Apr 2014 20:53:42 +0000 (16:53 -0400)
Added postgres NOTIFY to ArvadosModel#log_change
Added postgres LISTEN to background thread, which posts a message to the websocket.
Notification works!

services/api/app/middlewares/rack_socket.rb
services/api/app/models/arvados_model.rb

index 7fcba83cfc6effc56adf775dc1726bbdc54f30f0..779c67503046a0f49dac738685a5a5ba73e90fd0 100644 (file)
+require 'rack'
+require 'faye/websocket'
+require 'oj'
+require 'eventmachine'
 
-  require 'faye/websocket'
+class RackSocket
 
-  class RackSocket
+  DEFAULT_ENDPOINT  = '/websocket'
 
-    DEFAULT_ENDPOINT  = '/websocket'
+  def die_gracefully_on_signal
+    Signal.trap("INT") { EM.stop }
+    Signal.trap("TERM") { EM.stop }
+  end
+
+  def initialize(app = nil, options = nil)
+    @app = app if app.respond_to?(:call)
+    @options = [app, options].grep(Hash).first || {}
+    @endpoint = @options[:mount] || DEFAULT_ENDPOINT
 
-    def initialize(app = nil, options = nil)
-      @app = app if app.respond_to?(:call)
-      @options = [app, options].grep(Hash).first || {}
-      @endpoint = @options[:mount] || DEFAULT_ENDPOINT
+    # from https://gist.github.com/eatenbyagrue/1338545#file-eventmachine-rb
+    if defined?(PhusionPassenger)
+      PhusionPassenger.on_event(:starting_worker_process) do |forked|
+        # for passenger, we need to avoid orphaned threads
+        if forked && EM.reactor_running?
+          EM.stop
+        end
+        Thread.new {
+          EM.run
+        }
+        die_gracefully_on_signal
+      end
+    else
+      # faciliates debugging
+      Thread.abort_on_exception = true
+      # just spawn a thread and start it up
+      Thread.new {
+        EM.run
+      }
     end
 
-    def call env
-      request = Rack::Request.new(env)
-      if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
-        ws = Faye::WebSocket.new(env)
+    @channel = EventMachine::Channel.new
+    @bgthread = nil
+  end
 
-        ws.on :message do |event|
-          puts "got #{event.data}"
-          ws.send(event.data)
-        end
+  def call env
+    request = Rack::Request.new(env)
+    if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
+      ws = Faye::WebSocket.new(env)
 
-        ws.on :close do |event|
-          p [:close, event.code, event.reason]
-          ws = nil
-        end
+      sub = @channel.subscribe do |msg|
+        puts "sending #{msg}"
+        ws.send({:message => "log"}.to_json)
+      end
 
-        # Return async Rack response
-        ws.rack_response
-      else
-        @app.call env
+      ws.on :message do |event|
+        puts "got #{event.data}"
+        ws.send(event.data)
+      end
+
+      ws.on :close do |event|
+        p [:close, event.code, event.reason]
+        @channel.unsubscribe sub
+        ws = nil
+      end
+
+      unless @bgthread
+        @bgthread = true
+        Thread.new do
+          # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
+          ActiveRecord::Base.connection_pool.with_connection do |connection|
+            conn = connection.instance_variable_get(:@connection)
+            begin
+              conn.async_exec "LISTEN logs"
+              while true
+                conn.wait_for_notify do |channel, pid, payload|
+                  puts "Received a NOTIFY on channel #{channel}"
+                  puts "from PG backend #{pid}"
+                  puts "saying #{payload}"
+                  @channel.push true
+                end
+              end
+            ensure
+              # Don't want the connection to still be listening once we return
+              # it to the pool - could result in weird behavior for the next
+              # thread to check it out.
+              conn.async_exec "UNLISTEN *"
+            end
+          end
+        end
       end
-    end
 
+      # Return async Rack response
+      ws.rack_response
+    else
+      @app.call env
+    end
   end
 
+end
+
 
index 25d73178c771a47c085fa32bd19b9c6e9676a1c7..f0f846c99e4fcb70f4445e8cd68d0f26a99aaa20 100644 (file)
@@ -295,6 +295,7 @@ class ArvadosModel < ActiveRecord::Base
     log = Log.new(event_type: event_type).fill_object(self)
     yield log
     log.save!
+    connection.execute "NOTIFY logs"
     log_start_state
   end