12055: Merge branch 'master' into 12055-nodemanager-ec2-tags
[arvados.git] / services / ws / session_v0.go
index db60738bdb68e5812cb09f25501b43f0fd6b1402..4fbfc489cf30fe0e56425e37d909c250f83d967d 100644 (file)
@@ -162,7 +162,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
        if sub.LastLogID == 0 {
                return
        }
-       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
        // Here we do a "select id" query and queue an event for every
        // log since the given ID, then use (*event)Detail() to
        // retrieve the whole row and decide whether to send it. This
@@ -177,17 +177,26 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                sub.LastLogID,
                time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
        if err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
                return
        }
-       defer rows.Close()
+
+       var ids []uint64
        for rows.Next() {
                var id uint64
                err := rows.Scan(&id)
                if err != nil {
-                       sess.log.WithError(err).Error("row Scan failed")
+                       sess.log.WithError(err).Error("sendOldEvents row Scan failed")
                        continue
                }
+               ids = append(ids, id)
+       }
+       if err := rows.Err(); err != nil {
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
+       }
+       rows.Close()
+
+       for _, id := range ids {
                for len(sess.sendq)*2 > cap(sess.sendq) {
                        // Ugly... but if we fill up the whole client
                        // queue with a backlog of old events, a
@@ -196,6 +205,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // client will probably reconnect and do the
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
+                       if sess.ws.Request().Context().Err() != nil {
+                               // Session terminated while we were sleeping
+                               return
+                       }
                }
                now := time.Now()
                e := &event{
@@ -212,9 +225,6 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        }
                }
        }
-       if err := rows.Err(); err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
-       }
 }
 
 type v0subscribe struct {