From 1a17734f7264bc74463e1e6fe115cdad6ec4c521 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 17 Nov 2016 14:49:46 -0500 Subject: [PATCH] 8460: Refactor "old events / other messages" mechanism to use the outgoing message queue. --- services/ws/handler.go | 91 +++++++++++++++++++-------------------- services/ws/router.go | 8 ++-- services/ws/session.go | 6 +-- services/ws/session_v0.go | 43 ++++++++++-------- services/ws/session_v1.go | 2 +- 5 files changed, 76 insertions(+), 74 deletions(-) diff --git a/services/ws/handler.go b/services/ws/handler.go index 2b94693610..3d42b9a208 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -13,7 +13,7 @@ type handler struct { Client arvados.Client PingTimeout time.Duration QueueSize int - NewSession func(wsConn) (session, error) + NewSession func(wsConn, chan<- interface{}) (session, error) } type handlerStats struct { @@ -23,18 +23,19 @@ type handlerStats struct { EventCount uint64 } -func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { +func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) { ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{ "RemoteAddr": ws.Request().RemoteAddr, })) - sess, err := h.NewSession(ws) + + queue := make(chan interface{}, h.QueueSize) + sess, err := h.NewSession(ws, queue) + log := logger(ctx) if err != nil { - logger(ctx).WithError(err).Error("NewSession failed") + log.WithError(err).Error("NewSession failed") return } - queue := make(chan interface{}, h.QueueSize) - stopped := make(chan struct{}) stop := make(chan error, 5) @@ -48,54 +49,53 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { } ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour)) n, err := ws.Read(buf) - logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame") - if err == nil && n == len(buf) { + buf := buf[:n] + log.WithField("frame", string(buf[:n])).Debug("received frame") + if err == nil && n == cap(buf) { err = errFrameTooBig } if err != nil { if err != io.EOF { - logger(ctx).WithError(err).Info("read error") + log.WithError(err).Info("read error") } stop <- err return } msg := make(map[string]interface{}) - err = json.Unmarshal(buf[:n], &msg) + err = json.Unmarshal(buf, &msg) if err != nil { - logger(ctx).WithError(err).Info("invalid json from client") + log.WithError(err).Info("invalid json from client") stop <- err return } - for _, buf := range sess.Receive(msg, buf[:n]) { - logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive") - queue <- buf - } + sess.Receive(msg, buf) } }() go func() { - for e := range queue { - if buf, ok := e.([]byte); ok { - ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) - logger(ctx).WithField("frame", string(buf)).Debug("send msg buf") - _, err := ws.Write(buf) + for data := range queue { + var e *event + var buf []byte + var err error + log := log + + switch data := data.(type) { + case []byte: + buf = data + case *event: + e = data + log = log.WithField("serial", e.Serial) + buf, err = sess.EventMessage(e) if err != nil { - logger(ctx).WithError(err).Error("write failed") + log.WithError(err).Error("EventMessage failed") stop <- err break + } else if len(buf) == 0 { + log.Debug("skip") + continue } - continue - } - e := e.(*event) - log := logger(ctx).WithField("serial", e.Serial) - - buf, err := sess.EventMessage(e) - if err != nil { - log.WithError(err).Error("EventMessage failed") - stop <- err - break - } else if len(buf) == 0 { - log.Debug("skip") + default: + log.WithField("data", data).Error("bad object in client queue") continue } @@ -118,6 +118,8 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { stats.EventCount++ } for _ = range queue { + // Ensure queue can't fill up and block other + // goroutines after we hit a write error. } }() @@ -127,20 +129,10 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { // channel closes or the incoming event stream ends. Shut down // the handler if the outgoing queue fills up. go func() { - send := func(e *event) { - select { - case queue <- e: - default: - stop <- errQueueFull - } - } - ticker := time.NewTicker(h.PingTimeout) defer ticker.Stop() for { - var e *event - var ok bool select { case <-stopped: close(queue) @@ -155,14 +147,19 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) { queue <- []byte(`{}`) } continue - case e, ok = <-events: + case e, ok := <-incoming: if !ok { close(queue) return } - } - if sess.Filter(e) { - send(e) + if !sess.Filter(e) { + continue + } + select { + case queue <- e: + default: + stop <- errQueueFull + } } } }() diff --git a/services/ws/router.go b/services/ws/router.go index 34656ad91a..e6cec0f5f1 100644 --- a/services/ws/router.go +++ b/services/ws/router.go @@ -31,18 +31,20 @@ type router struct { lastReqMtx sync.Mutex } +type sessionFactory func(wsConn, chan<- interface{}, arvados.Client, *sql.DB) (session, error) + func (rtr *router) setup() { rtr.mux = http.NewServeMux() rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0)) rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1)) } -func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server { +func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server { handler := &handler{ PingTimeout: rtr.Config.PingTimeout.Duration(), QueueSize: rtr.Config.ClientEventQueue, - NewSession: func(ws wsConn) (session, error) { - return newSession(ws, rtr.Config.Client, rtr.eventSource.DB()) + NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) { + return newSession(ws, sendq, rtr.Config.Client, rtr.eventSource.DB()) }, } return &websocket.Server{ diff --git a/services/ws/session.go b/services/ws/session.go index d148f59128..9c3cef1d9a 100644 --- a/services/ws/session.go +++ b/services/ws/session.go @@ -1,10 +1,8 @@ package main type session interface { - // Receive processes a message received from the client. If - // the returned list of messages is non-nil, they will be - // queued for sending to the client. - Receive(map[string]interface{}, []byte) [][]byte + // Receive processes a message received from the client. + Receive(map[string]interface{}, []byte) // Filter returns true if the event should be queued for // sending to the client. It should return as fast as diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go index 210c8c5d6f..41432821db 100644 --- a/services/ws/session_v0.go +++ b/services/ws/session_v0.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "sync" + "sync/atomic" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -23,16 +24,19 @@ var ( type v0session struct { ws wsConn + sendq chan<- interface{} db *sql.DB permChecker permChecker subscriptions []v0subscribe + lastMsgID uint64 log *log.Entry mtx sync.Mutex setupOnce sync.Once } -func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) { +func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) { sess := &v0session{ + sendq: sendq, ws: ws, db: db, permChecker: NewPermChecker(ac), @@ -51,23 +55,24 @@ func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) { return sess, nil } -func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte { +func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) { sess.log.WithField("data", msg).Debug("received message") var sub v0subscribe if err := json.Unmarshal(buf, &sub); err != nil { sess.log.WithError(err).Info("ignored invalid request") - return nil + return } if sub.Method == "subscribe" { sub.prepare(sess) sess.log.WithField("sub", sub).Debug("sub prepared") + sess.sendq <- v0subscribeOK sess.mtx.Lock() sess.subscriptions = append(sess.subscriptions, sub) sess.mtx.Unlock() - - return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...) + sub.sendOldEvents(sess) + return } - return [][]byte{v0subscribeFail} + sess.sendq <- v0subscribeFail } func (sess *v0session) EventMessage(e *event) ([]byte, error) { @@ -82,7 +87,7 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) { } msg := map[string]interface{}{ - "msgID": e.Serial, + "msgID": atomic.AddUint64(&sess.lastMsgID, 1), "id": detail.ID, "uuid": detail.UUID, "object_uuid": detail.ObjectUUID, @@ -122,7 +127,7 @@ func (sess *v0session) Filter(e *event) bool { return false } -func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) { +func (sub *v0subscribe) sendOldEvents(sess *v0session) { if sub.LastLogID == 0 { return } @@ -151,27 +156,27 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) { sess.log.WithError(err).Error("row Scan failed") continue } + for len(sess.sendq)*2 > cap(sess.sendq) { + // Ugly... but if we fill up the whole client + // queue with a backlog of old events, a + // single new event will overflow it and + // terminate the connection, and then the + // client will probably reconnect and do the + // same thing all over again. + time.Sleep(100 * time.Millisecond) + } e := &event{ LogID: id, Received: time.Now(), db: sess.db, } - if !sub.match(sess, e) { - sess.log.WithField("event", e).Debug("skip old event") - continue - } - msg, err := sess.EventMessage(e) - if err != nil { - sess.log.WithError(err).Error("event marshal failed") - continue + if sub.match(sess, e) { + sess.sendq <- e } - sess.log.WithField("data", msg).Debug("will queue old event") - msgs = append(msgs, msg) } if err := rows.Err(); err != nil { sess.log.WithError(err).Error("db.Query failed") } - return } type v0subscribe struct { diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go index 60d12c449c..88e2414611 100644 --- a/services/ws/session_v1.go +++ b/services/ws/session_v1.go @@ -7,6 +7,6 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvados" ) -func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) { +func NewSessionV1(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) { return nil, errors.New("Not implemented") } -- 2.30.2