X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a4724fe92e651abb06acf8c5e75184561a55c854..517d3fca54225873d36f94083f3b7056ce271f46:/services/api/lib/eventbus.rb diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb index 79315aaf6f..50400ee86b 100644 --- a/services/api/lib/eventbus.rb +++ b/services/api/lib/eventbus.rb @@ -72,14 +72,14 @@ class EventBus cond_out = [] param_out = [] ws.filters.each do |filter| - ft = record_filters filter.filters, Log.table_name + ft = record_filters filter.filters, Log cond_out += ft[:cond_out] param_out += ft[:param_out] end # Add filters to query if cond_out.any? - logs = logs.where(cond_out.join(' OR '), *param_out) + logs = logs.where('(' + cond_out.join(') OR (') + ')', *param_out) end # Finally execute query and actually send the matching log rows @@ -99,6 +99,57 @@ class EventBus end end + # Handle inbound subscribe or unsubscribe message. + def handle_message ws, event + begin + # Parse event data as JSON + p = (Oj.load event.data).symbolize_keys + + if p[:method] == 'subscribe' + # Handle subscribe event + + if p[:last_log_id] + # Set or reset the last_log_id. The event bus only reports events + # for rows that come after last_log_id. + ws.last_log_id = p[:last_log_id].to_i + end + + if ws.filters.length < MAX_FILTERS + # Add a filter. This gets the :filters field which is the same + # format as used for regular index queries. + ws.filters << Filter.new(p) + ws.send ({status: 200, message: 'subscribe ok'}.to_json) + + # Send any pending events + push_events ws + else + ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json) + end + + elsif p[:method] == 'unsubscribe' + # Handle unsubscribe event + + len = ws.filters.length + ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) } + if ws.filters.length < len + ws.send ({status: 200, message: 'unsubscribe ok'}.to_json) + else + ws.send ({status: 404, message: 'filter not found'}.to_json) + end + + else + ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) + end + rescue Oj::Error => e + ws.send ({status: 400, message: "malformed request"}.to_json) + rescue Exception => e + puts "Error handling message: #{$!}" + puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" + ws.send ({status: 500, message: 'error'}.to_json) + ws.close + end + end + # Constant maximum number of filters, to avoid silly huge database queries. MAX_FILTERS = 16 @@ -126,53 +177,7 @@ class EventBus # Set up callback for inbound message dispatch. ws.on :message do |event| - begin - # Parse event data as JSON - p = (Oj.load event.data).symbolize_keys - - if p[:method] == 'subscribe' - # Handle subscribe event - - if p[:last_log_id] - # Set or reset the last_log_id. The event bus only reports events - # for rows that come after last_log_id. - ws.last_log_id = p[:last_log_id].to_i - end - - if ws.filters.length < MAX_FILTERS - # Add a filter. This gets the :filters field which is the same - # format as used for regular index queries. - ws.filters << Filter.new(p) - ws.send ({status: 200, message: 'subscribe ok'}.to_json) - - # Send any pending events - push_events ws - else - ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json) - end - - elsif p[:method] == 'unsubscribe' - # Handle unsubscribe event - - len = ws.filters.length - ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) } - if ws.filters.length < len - ws.send ({status: 200, message: 'unsubscribe ok'}.to_json) - else - ws.send ({status: 404, message: 'filter not found'}.to_json) - end - - else - ws.send ({status: 400, message: "missing or unrecognized method"}.to_json) - end - rescue Oj::Error => e - ws.send ({status: 400, message: "malformed request"}.to_json) - rescue Exception => e - puts "Error handling message: #{$!}" - puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" - ws.send ({status: 500, message: 'error'}.to_json) - ws.close - end + handle_message ws, event end # Set up socket close callback @@ -209,6 +214,7 @@ class EventBus conn.async_exec "UNLISTEN *" end end + @bgthread = false end end end