Merge branch 'master' into 4062-infinite-scroll-repeat-issue
[arvados.git] / services / api / lib / eventbus.rb
index 50400ee86bf1b6827cbc64dddc024bf8d47e5364..bccbeea4bb62ed951cdcc3f6a6ad2d2ac9098d7d 100644 (file)
@@ -57,29 +57,40 @@ class EventBus
           # Start with log rows readable by user, sorted in ascending order
           logs = Log.readable_by(ws.user).order("id asc")
 
+          cond_id = nil
+          cond_out = []
+          param_out = []
+
           if ws.last_log_id
             # Client is only interested in log rows that are newer than the
             # last log row seen by the client.
-            logs = logs.where("logs.id > ?", ws.last_log_id)
+            cond_id = "logs.id > ?"
+            param_out << ws.last_log_id
           elsif id
             # No last log id, so only look at the most recently changed row
-            logs = logs.where("logs.id = ?", id.to_i)
+            cond_id = "logs.id = ?"
+            param_out << id.to_i
           else
             return
           end
 
           # Now process filters provided by client
-          cond_out = []
-          param_out = []
           ws.filters.each do |filter|
             ft = record_filters filter.filters, Log
-            cond_out += ft[:cond_out]
-            param_out += ft[:param_out]
+            if ft[:cond_out].any?
+              # Join the clauses within a single subscription filter with AND
+              # so it is consistent with regular queries
+              cond_out << "(#{ft[:cond_out].join ') AND ('})"
+              param_out += ft[:param_out]
+            end
           end
 
           # Add filters to query
           if cond_out.any?
-            logs = logs.where('(' + cond_out.join(') OR (') + ')', *param_out)
+            # Join subscriptions with OR
+            logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
+          else
+            logs = logs.where(cond_id, *param_out)
           end
 
           # Finally execute query and actually send the matching log rows
@@ -92,8 +103,8 @@ class EventBus
           ws.last_log_id = id.to_i
         end
       rescue Exception => e
-        puts "Error publishing event: #{$!}"
-        puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+        Rails.logger.warn "Error publishing event: #{$!}"
+        Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
         ws.send ({status: 500, message: 'error'}.to_json)
         ws.close
       end
@@ -118,7 +129,7 @@ class EventBus
           # Add a filter.  This gets the :filters field which is the same
           # format as used for regular index queries.
           ws.filters << Filter.new(p)
-          ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+          ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
 
           # Send any pending events
           push_events ws
@@ -143,8 +154,8 @@ class EventBus
     rescue Oj::Error => e
       ws.send ({status: 400, message: "malformed request"}.to_json)
     rescue Exception => e
-      puts "Error handling message: #{$!}"
-      puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+      Rails.logger.warn "Error handling message: #{$!}"
+      Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
       ws.send ({status: 500, message: 'error'}.to_json)
       ws.close
     end