"database/sql"
"encoding/json"
"errors"
+ "reflect"
"sync"
"sync/atomic"
"time"
sess.mtx.Unlock()
sub.sendOldEvents(sess)
return nil
+ } else if sub.Method == "unsubscribe" {
+ sess.mtx.Lock()
+ found := false
+ for i, s := range sess.subscriptions {
+ if !reflect.DeepEqual(s.Filters, sub.Filters) {
+ continue
+ }
+ copy(sess.subscriptions[i:], sess.subscriptions[i+1:])
+ sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1]
+ found = true
+ break
+ }
+ sess.mtx.Unlock()
+ sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe")
+ if found {
+ sess.sendq <- v0subscribeOK
+ return nil
+ }
} else {
sess.log.WithField("Method", sub.Method).Info("unknown method")
}