12123: Implement "unsubscribe" method.
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 17 Aug 2017 19:27:59 +0000 (15:27 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 17 Aug 2017 19:32:32 +0000 (15:32 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

services/ws/session_v0.go
services/ws/session_v0_test.go

index 4fbfc489cf30fe0e56425e37d909c250f83d967d..b57abb5cac31b0d563528669514e3bdcee151327 100644 (file)
@@ -8,6 +8,7 @@ import (
        "database/sql"
        "encoding/json"
        "errors"
+       "reflect"
        "sync"
        "sync/atomic"
        "time"
@@ -86,6 +87,24 @@ func (sess *v0session) Receive(buf []byte) error {
                sess.mtx.Unlock()
                sub.sendOldEvents(sess)
                return nil
+       } else if sub.Method == "unsubscribe" {
+               sess.mtx.Lock()
+               found := false
+               for i, s := range sess.subscriptions {
+                       if !reflect.DeepEqual(s.Filters, sub.Filters) {
+                               continue
+                       }
+                       copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+                       sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+                       found = true
+                       break
+               }
+               sess.mtx.Unlock()
+               sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+               if found {
+                       sess.sendq <- v0subscribeOK
+                       return nil
+               }
        } else {
                sess.log.WithField("Method", sub.Method).Info("unknown method")
        }
index 9f743e0b5e3d58312d2b3a2636b148bd493b51e0..7585bc5e17e017dc095a141d550e4e609c877c94 100644 (file)
@@ -71,15 +71,28 @@ func (s *v0Suite) TestFilters(c *check.C) {
        conn, r, w := s.testClient()
        defer conn.Close()
 
-       c.Check(w.Encode(map[string]interface{}{
-               "method":  "subscribe",
-               "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
-       }), check.IsNil)
-       s.expectStatus(c, r, 200)
+       cmd := func(method, eventType string, status int) {
+               c.Check(w.Encode(map[string]interface{}{
+                       "method":  method,
+                       "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
+               }), check.IsNil)
+               s.expectStatus(c, r, status)
+       }
+       cmd("subscribe", "update", 200)
+       cmd("subscribe", "update", 200)
+       cmd("subscribe", "create", 200)
+       cmd("subscribe", "update", 200)
+       cmd("unsubscribe", "blip", 400)
+       cmd("unsubscribe", "create", 200)
+       cmd("unsubscribe", "update", 200)
 
        go s.emitEvents(nil)
        lg := s.expectLog(c, r)
        c.Check(lg.EventType, check.Equals, "update")
+
+       cmd("unsubscribe", "update", 200)
+       cmd("unsubscribe", "update", 200)
+       cmd("unsubscribe", "update", 400)
 }
 
 func (s *v0Suite) TestLastLogID(c *check.C) {