+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
+ if sub.LastLogID == 0 {
+ return
+ }
+ sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+ // Here we do a "select id" query and queue an event for every
+ // log since the given ID, then use (*event)Detail() to
+ // retrieve the whole row and decide whether to send it. This
+ // approach is very inefficient if the subscriber asks for
+ // last_log_id==1, even if the filters end up matching very
+ // few events.
+ //
+ // To mitigate this, filter on "created > 10 minutes ago" when
+ // retrieving the list of old event IDs to consider.
+ rows, err := sess.db.Query(
+ `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+ sub.LastLogID,
+ time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+ if err != nil {
+ sess.log.WithError(err).Error("db.Query failed")
+ return
+ }
+ for rows.Next() {
+ var id uint64
+ err := rows.Scan(&id)
+ if err != nil {
+ sess.log.WithError(err).Error("row Scan failed")
+ continue
+ }
+ for len(sess.sendq)*2 > cap(sess.sendq) {
+ // Ugly... but if we fill up the whole client
+ // queue with a backlog of old events, a
+ // single new event will overflow it and
+ // terminate the connection, and then the
+ // client will probably reconnect and do the
+ // same thing all over again.
+ time.Sleep(100 * time.Millisecond)
+ }
+ now := time.Now()
+ e := &event{
+ LogID: id,
+ Received: now,
+ Ready: now,
+ db: sess.db,
+ }
+ if sub.match(sess, e) {
+ select {
+ case sess.sendq <- e:
+ case <-sess.ws.Request().Context().Done():
+ return
+ }
+ }
+ }
+ if err := rows.Err(); err != nil {
+ sess.log.WithError(err).Error("db.Query failed")
+ }
+}
+