X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/10402f74ccdae453ae353c389a02c0415adcf1e7..b3919687f18582ccff1a6316846dcb04d9b5f989:/services/ws/session_v0.go diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 4fbfc489cf..309352b39e 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -2,18 +2,20 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package ws import ( "database/sql" "encoding/json" "errors" + "reflect" "sync" "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/Sirupsen/logrus" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/sirupsen/logrus" ) var ( @@ -43,7 +45,7 @@ type v0session struct { permChecker permChecker subscriptions []v0subscribe lastMsgID uint64 - log *logrus.Entry + log logrus.FieldLogger mtx sync.Mutex setupOnce sync.Once } @@ -58,7 +60,7 @@ func newSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecke db: db, ac: ac, permChecker: pc, - log: logger(ws.Request().Context()), + log: ctxlog.FromContext(ws.Request().Context()), } err := ws.Request().ParseForm() @@ -86,6 +88,24 @@ func (sess *v0session) Receive(buf []byte) error { sess.mtx.Unlock() sub.sendOldEvents(sess) return nil + } else if sub.Method == "unsubscribe" { + sess.mtx.Lock() + found := false + for i, s := range sess.subscriptions { + if !reflect.DeepEqual(s.Filters, sub.Filters) { + continue + } + copy(sess.subscriptions[i:], sess.subscriptions[i+1:]) + sess.subscriptions = sess.subscriptions[:len(sess.subscriptions)-1] + found = true + break + } + sess.mtx.Unlock() + sess.log.WithField("sub", sub).WithField("found", found).Debug("unsubscribe") + if found { + sess.sendq <- v0subscribeOK + return nil + } } else { sess.log.WithField("Method", sub.Method).Info("unknown method") } @@ -109,7 +129,7 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) { } else { permTarget = detail.ObjectUUID } - ok, err := sess.permChecker.Check(permTarget) + ok, err := sess.permChecker.Check(sess.ws.Request().Context(), permTarget) if err != nil || !ok { return nil, err }