Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / services / api / lib / eventbus.rb
index e7f2bb13108e327d533a673caae31bdb8e7f09e6..5e413d5cabf580688693754af4201497f2d79e0a 100644 (file)
@@ -78,6 +78,10 @@ class EventBus
     @connection_count = 0
   end
 
+  def send_message(ws, obj)
+    ws.send(Oj.dump(obj, mode: :compat))
+  end
+
   # Push out any pending events to the connection +ws+
   # +notify_id+  the id of the most recent row in the log table, may be nil
   #
@@ -143,10 +147,10 @@ class EventBus
         #
         # Note: find_each implies order('id asc'), which is what we
         # want.
-        logs.select(:id).find_each do |l|
+        logs.select('logs.id').find_each do |l|
           if not ws.sent_ids.include?(l.id)
             # only send if not a duplicate
-            ws.send(Log.find(l.id).as_api_response.to_json)
+            send_message(ws, Log.find(l.id).as_api_response)
           end
           if not ws.last_log_id.nil?
             # record ids only when sending "catchup" messages, not notifies
@@ -158,12 +162,12 @@ class EventBus
     rescue ArgumentError => e
       # There was some kind of user error.
       Rails.logger.warn "Error publishing event: #{$!}"
-      ws.send ({status: 500, message: $!}.to_json)
+      send_message(ws, {status: 500, message: $!})
       ws.close
     rescue => e
       Rails.logger.warn "Error publishing event: #{$!}"
       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-      ws.send ({status: 500, message: $!}.to_json)
+      send_message(ws, {status: 500, message: $!})
       ws.close
       # These exceptions typically indicate serious server trouble:
       # out of memory issues, database connection problems, etc.  Go ahead and
@@ -180,7 +184,7 @@ class EventBus
         p = (Oj.strict_load event.data).symbolize_keys
         filter = Filter.new(p)
       rescue Oj::Error => e
-        ws.send ({status: 400, message: "malformed request"}.to_json)
+        send_message(ws, {status: 400, message: "malformed request"})
         return
       end
 
@@ -200,12 +204,12 @@ class EventBus
           # Add a filter.  This gets the :filters field which is the same
           # format as used for regular index queries.
           ws.filters << filter
-          ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
+          send_message(ws, {status: 200, message: 'subscribe ok', filter: p})
 
           # Send any pending events
           push_events ws, nil
         else
-          ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
+          send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"})
         end
 
       elsif p[:method] == 'unsubscribe'
@@ -214,18 +218,18 @@ class EventBus
         len = ws.filters.length
         ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
         if ws.filters.length < len
-          ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+          send_message(ws, {status: 200, message: 'unsubscribe ok'})
         else
-          ws.send ({status: 404, message: 'filter not found'}.to_json)
+          send_message(ws, {status: 404, message: 'filter not found'})
         end
 
       else
-        ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+        send_message(ws, {status: 400, message: "missing or unrecognized method"})
       end
     rescue => e
       Rails.logger.warn "Error handling message: #{$!}"
       Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-      ws.send ({status: 500, message: 'error'}.to_json)
+      send_message(ws, {status: 500, message: 'error'})
       ws.close
     end
   end
@@ -241,8 +245,13 @@ class EventBus
     # Disconnect if no valid API token.
     # current_user is included from CurrentApiClient
     if not current_user
-      ws.send ({status: 401, message: "Valid API token required"}.to_json)
-      ws.close
+      send_message(ws, {status: 401, message: "Valid API token required"})
+      # Wait for the handshake to complete before closing the
+      # socket. Otherwise, nginx responds with HTTP 502 Bad gateway,
+      # and the client never sees our real error message.
+      ws.on :open do |event|
+        ws.close
+      end
       return
     end
 
@@ -262,7 +271,7 @@ class EventBus
     # forward them to the thread associated with the connection.
     sub = @channel.subscribe do |msg|
       if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
-        ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
+        send_message(ws, {status: 500, message: 'Notify backlog too long'})
         ws.close
         @channel.unsubscribe sub
         ws.queue.clear
@@ -307,6 +316,7 @@ class EventBus
         @mtx.synchronize do
           @connection_count -= 1
         end
+        ActiveRecord::Base.connection.close
       end
     end