From: Tom Clegg Date: Thu, 17 Aug 2017 19:27:59 +0000 (-0400) Subject: 12123: Implement "unsubscribe" method. X-Git-Tag: 1.1.0~75^2~1 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/a68b0ba89ab00dddbc1e59872ed81c08a6113295 12123: Implement "unsubscribe" method. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 4fbfc489cf..b57abb5cac 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -8,6 +8,7 @@ import ( "database/sql" "encoding/json" "errors" + "reflect" "sync" "sync/atomic" "time" @@ -86,6 +87,24 @@ func (sess *v0session) Receive(buf []byte) error { sess.mtx.Unlock() sub.sendOldEvents(sess) return nil + } else if sub.Method == "unsubscribe" { + sess.mtx.Lock() + found := false + for i, s := range sess.subscriptions { + if !reflect.DeepEqual(s.Filters, sub.Filters) { + continue + } + copy(sess.subscriptions[i:], sess.subscriptions[i+1:]) + sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1] + found = true + break + } + sess.mtx.Unlock() + sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe") + if found { + sess.sendq <- v0subscribeOK + return nil + } } else { sess.log.WithField("Method", sub.Method).Info("unknown method") } diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go index 9f743e0b5e..7585bc5e17 100644 --- a/services/ws/session_v0_test.go +++ b/services/ws/session_v0_test.go @@ -71,15 +71,28 @@ func (s *v0Suite) TestFilters(c *check.C) { conn, r, w := s.testClient() defer conn.Close() - c.Check(w.Encode(map[string]interface{}{ - "method": "subscribe", - "filters": [][]interface{}{{"event_type", "in", []string{"update"}}}, - }), check.IsNil) - s.expectStatus(c, r, 200) + cmd := func(method, eventType string, status int) { + c.Check(w.Encode(map[string]interface{}{ + "method": method, + "filters": [][]interface{}{{"event_type", "in", []string{eventType}}}, + }), check.IsNil) + s.expectStatus(c, r, status) + } + cmd("subscribe", "update", 200) + cmd("subscribe", "update", 200) + cmd("subscribe", "create", 200) + cmd("subscribe", "update", 200) + cmd("unsubscribe", "blip", 400) + cmd("unsubscribe", "create", 200) + cmd("unsubscribe", "update", 200) go s.emitEvents(nil) lg := s.expectLog(c, r) c.Check(lg.EventType, check.Equals, "update") + + cmd("unsubscribe", "update", 200) + cmd("unsubscribe", "update", 200) + cmd("unsubscribe", "update", 400) } func (s *v0Suite) TestLastLogID(c *check.C) {