11960: Get the full list of old IDs up front in sendOldEvents.
authorTom Clegg <tom@curoverse.com>
Thu, 13 Jul 2017 21:59:22 +0000 (17:59 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 13 Jul 2017 21:59:22 +0000 (17:59 -0400)
This avoids keeping the "get list of log IDs" database query open
while each log row gets looked up, queued, permission-checked,
serialized, and sent over the wire to the client.

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curoverse.com>

services/ws/session_v0.go

index db60738bdb68e5812cb09f25501b43f0fd6b1402..58c64231cb53c1204ceed70b0ea030a7050ebb95 100644 (file)
@@ -162,7 +162,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
        if sub.LastLogID == 0 {
                return
        }
-       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
        // Here we do a "select id" query and queue an event for every
        // log since the given ID, then use (*event)Detail() to
        // retrieve the whole row and decide whether to send it. This
@@ -177,17 +177,26 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                sub.LastLogID,
                time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
        if err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
                return
        }
-       defer rows.Close()
+
+       var ids []uint64
        for rows.Next() {
                var id uint64
                err := rows.Scan(&id)
                if err != nil {
-                       sess.log.WithError(err).Error("row Scan failed")
+                       sess.log.WithError(err).Error("sendOldEvents row Scan failed")
                        continue
                }
+               ids = append(ids, id)
+       }
+       if err := rows.Err(); err != nil {
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
+       }
+       rows.Close()
+
+       for _, id := range ids {
                for len(sess.sendq)*2 > cap(sess.sendq) {
                        // Ugly... but if we fill up the whole client
                        // queue with a backlog of old events, a
@@ -212,9 +221,6 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        }
                }
        }
-       if err := rows.Err(); err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
-       }
 }
 
 type v0subscribe struct {