9945: Merge branch 'master' into 9945-make-python-package-dependency-free
[arvados.git] / services / ws / session_v0.go
index db60738bdb68e5812cb09f25501b43f0fd6b1402..7bd1498158304ea3ab8a969c0c90129241ab1028 100644 (file)
@@ -8,12 +8,13 @@ import (
        "database/sql"
        "encoding/json"
        "errors"
+       "reflect"
        "sync"
        "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
+       "github.com/sirupsen/logrus"
 )
 
 var (
@@ -86,6 +87,24 @@ func (sess *v0session) Receive(buf []byte) error {
                sess.mtx.Unlock()
                sub.sendOldEvents(sess)
                return nil
+       } else if sub.Method == "unsubscribe" {
+               sess.mtx.Lock()
+               found := false
+               for i, s := range sess.subscriptions {
+                       if !reflect.DeepEqual(s.Filters, sub.Filters) {
+                               continue
+                       }
+                       copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+                       sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+                       found = true
+                       break
+               }
+               sess.mtx.Unlock()
+               sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+               if found {
+                       sess.sendq <- v0subscribeOK
+                       return nil
+               }
        } else {
                sess.log.WithField("Method", sub.Method).Info("unknown method")
        }
@@ -162,7 +181,7 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
        if sub.LastLogID == 0 {
                return
        }
-       sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
+       sess.log.WithField("LastLogID", sub.LastLogID).Debug("sendOldEvents")
        // Here we do a "select id" query and queue an event for every
        // log since the given ID, then use (*event)Detail() to
        // retrieve the whole row and decide whether to send it. This
@@ -177,17 +196,26 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                sub.LastLogID,
                time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
        if err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
                return
        }
-       defer rows.Close()
+
+       var ids []uint64
        for rows.Next() {
                var id uint64
                err := rows.Scan(&id)
                if err != nil {
-                       sess.log.WithError(err).Error("row Scan failed")
+                       sess.log.WithError(err).Error("sendOldEvents row Scan failed")
                        continue
                }
+               ids = append(ids, id)
+       }
+       if err := rows.Err(); err != nil {
+               sess.log.WithError(err).Error("sendOldEvents db.Query failed")
+       }
+       rows.Close()
+
+       for _, id := range ids {
                for len(sess.sendq)*2 > cap(sess.sendq) {
                        // Ugly... but if we fill up the whole client
                        // queue with a backlog of old events, a
@@ -196,6 +224,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // client will probably reconnect and do the
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
+                       if sess.ws.Request().Context().Err() != nil {
+                               // Session terminated while we were sleeping
+                               return
+                       }
                }
                now := time.Now()
                e := &event{
@@ -212,9 +244,6 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        }
                }
        }
-       if err := rows.Err(); err != nil {
-               sess.log.WithError(err).Error("db.Query failed")
-       }
 }
 
 type v0subscribe struct {