+ # Add filters to query
+ if cond_out.any?
+ # Join subscriptions with OR
+ logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
+ else
+ logs = logs.where(cond_id, *param_out)
+ end
+
+ # Execute query and actually send the matching log rows. Load
+ # the full log records only when we're ready to send them,
+ # though: otherwise, (1) postgres has to build the whole
+ # result set and return it to us before we can send the first
+ # event, and (2) we store lots of records in memory while
+ # waiting to spool them out to the client. Both of these are
+ # troublesome when log records are large (e.g., a collection
+ # update contains both old and new manifest_text).
+ #
+ # Note: find_each implies order('id asc'), which is what we
+ # want.
+ logs.select('logs.id').find_each do |l|
+ if not ws.sent_ids.include?(l.id)
+ # only send if not a duplicate
+ send_message(ws, Log.find(l.id).as_api_response)
+ end
+ if not ws.last_log_id.nil?
+ # record ids only when sending "catchup" messages, not notifies
+ ws.sent_ids << l.id