X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/50a4212ec9319d9a700ba5cdde12b6f72e3a96f9..ba418300c50e1375ca9938562579b7bd6bf9490d:/services/ws/handler.go diff --git a/services/ws/handler.go b/services/ws/handler.go index 913b1ee800..912643ad97 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package ws import ( "context" @@ -12,6 +12,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/stats" + "github.com/sirupsen/logrus" ) type handler struct { @@ -31,12 +32,11 @@ type handlerStats struct { EventCount uint64 } -func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) { +func (h *handler) Handle(ws wsConn, logger logrus.FieldLogger, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) { h.setupOnce.Do(h.setup) ctx, cancel := context.WithCancel(ws.Request().Context()) defer cancel() - log := logger(ctx) incoming := eventSource.NewSink() defer incoming.Stop() @@ -53,7 +53,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC sess, err := newSession(ws, queue) if err != nil { - log.WithError(err).Error("newSession failed") + logger.WithError(err).Error("newSession failed") return } @@ -71,19 +71,19 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour)) n, err := ws.Read(buf) buf := buf[:n] - log.WithField("frame", string(buf[:n])).Debug("received frame") + logger.WithField("frame", string(buf[:n])).Debug("received frame") if err == nil && n == cap(buf) { err = errFrameTooBig } if err != nil { if err != io.EOF && ctx.Err() == nil { - log.WithError(err).Info("read error") + logger.WithError(err).Info("read error") } return } err = sess.Receive(buf) if err != nil { - log.WithError(err).Error("sess.Receive() failed") + logger.WithError(err).Error("sess.Receive() failed") return } } @@ -108,38 +108,38 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC var e *event var buf []byte var err error - log := log + logger := logger switch data := data.(type) { case []byte: buf = data case *event: e = data - log = log.WithField("serial", e.Serial) + logger = logger.WithField("serial", e.Serial) buf, err = sess.EventMessage(e) if err != nil { - log.WithError(err).Error("EventMessage failed") + logger.WithError(err).Error("EventMessage failed") return } else if len(buf) == 0 { - log.Debug("skip") + logger.Debug("skip") continue } default: - log.WithField("data", data).Error("bad object in client queue") + logger.WithField("data", data).Error("bad object in client queue") continue } - log.WithField("frame", string(buf)).Debug("send event") + logger.WithField("frame", string(buf)).Debug("send event") ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) t0 := time.Now() _, err = ws.Write(buf) if err != nil { if ctx.Err() == nil { - log.WithError(err).Error("write failed") + logger.WithError(err).Error("write failed") } return } - log.Debug("sent") + logger.Debug("sent") if e != nil { hStats.QueueDelayNs += t0.Sub(e.Ready) @@ -189,7 +189,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC select { case queue <- e: default: - log.WithError(errQueueFull).Error("terminate") + logger.WithError(errQueueFull).Error("terminate") return } }