X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8c014223e42683d308798475c021bad7a794e998..d637371ad13e3f4851313f241270b257ff5f9256:/services/ws/handler.go diff --git a/services/ws/handler.go b/services/ws/handler.go index dace39be35..912643ad97 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -1,13 +1,18 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package ws import ( "context" - "fmt" "io" "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/stats" + "github.com/sirupsen/logrus" ) type handler struct { @@ -16,7 +21,7 @@ type handler struct { QueueSize int mtx sync.Mutex - lastDelay map[chan interface{}]time.Duration + lastDelay map[chan interface{}]stats.Duration setupOnce sync.Once } @@ -27,11 +32,11 @@ type handlerStats struct { EventCount uint64 } -func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (stats handlerStats) { +func (h *handler) Handle(ws wsConn, logger logrus.FieldLogger, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) { h.setupOnce.Do(h.setup) ctx, cancel := context.WithCancel(ws.Request().Context()) - log := logger(ctx) + defer cancel() incoming := eventSource.NewSink() defer incoming.Stop() @@ -48,11 +53,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC sess, err := newSession(ws, queue) if err != nil { - log.WithError(err).Error("newSession failed") + logger.WithError(err).Error("newSession failed") return } + // Receive websocket frames from the client and pass them to + // sess.Receive(). go func() { + defer cancel() buf := make([]byte, 2<<20) for { select { @@ -63,27 +71,29 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour)) n, err := ws.Read(buf) buf := buf[:n] - log.WithField("frame", string(buf[:n])).Debug("received frame") + logger.WithField("frame", string(buf[:n])).Debug("received frame") if err == nil && n == cap(buf) { err = errFrameTooBig } if err != nil { - if err != io.EOF { - log.WithError(err).Info("read error") + if err != io.EOF && ctx.Err() == nil { + logger.WithError(err).Info("read error") } - cancel() return } err = sess.Receive(buf) if err != nil { - log.WithError(err).Error("sess.Receive() failed") - cancel() + logger.WithError(err).Error("sess.Receive() failed") return } } }() + // Take items from the outgoing queue, serialize them using + // sess.EventMessage() as needed, and send them to the client + // as websocket frames. go func() { + defer cancel() for { var ok bool var data interface{} @@ -98,48 +108,48 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC var e *event var buf []byte var err error - log := log + logger := logger switch data := data.(type) { case []byte: buf = data case *event: e = data - log = log.WithField("serial", e.Serial) + logger = logger.WithField("serial", e.Serial) buf, err = sess.EventMessage(e) if err != nil { - log.WithError(err).Error("EventMessage failed") - cancel() - break + logger.WithError(err).Error("EventMessage failed") + return } else if len(buf) == 0 { - log.Debug("skip") + logger.Debug("skip") continue } default: - log.WithField("data", data).Error("bad object in client queue") + logger.WithField("data", data).Error("bad object in client queue") continue } - log.WithField("frame", string(buf)).Debug("send event") + logger.WithField("frame", string(buf)).Debug("send event") ws.SetWriteDeadline(time.Now().Add(h.PingTimeout)) t0 := time.Now() _, err = ws.Write(buf) if err != nil { - log.WithError(err).Error("write failed") - cancel() - break + if ctx.Err() == nil { + logger.WithError(err).Error("write failed") + } + return } - log.Debug("sent") + logger.Debug("sent") if e != nil { - stats.QueueDelayNs += t0.Sub(e.Ready) + hStats.QueueDelayNs += t0.Sub(e.Ready) h.mtx.Lock() - h.lastDelay[queue] = time.Since(e.Ready) + h.lastDelay[queue] = stats.Duration(time.Since(e.Ready)) h.mtx.Unlock() } - stats.WriteDelayNs += time.Since(t0) - stats.EventBytes += uint64(len(buf)) - stats.EventCount++ + hStats.WriteDelayNs += time.Since(t0) + hStats.EventBytes += uint64(len(buf)) + hStats.EventCount++ } }() @@ -149,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC // is done/cancelled or the incoming event stream ends. Shut // down the handler if the outgoing queue fills up. go func() { + defer cancel() ticker := time.NewTicker(h.PingTimeout) defer ticker.Stop() @@ -168,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC default: } } - continue case e, ok := <-incoming.Channel(): if !ok { - cancel() return } if !sess.Filter(e) { @@ -180,8 +189,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC select { case queue <- e: default: - log.WithError(errQueueFull).Error("terminate") - cancel() + logger.WithError(errQueueFull).Error("terminate") return } } @@ -192,7 +200,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC return } -func (h *handler) Status() interface{} { +func (h *handler) DebugStatus() interface{} { h.mtx.Lock() defer h.mtx.Unlock() @@ -201,10 +209,8 @@ func (h *handler) Status() interface{} { QueueMin int QueueMax int QueueTotal uint64 - queueDelayMin time.Duration - QueueDelayMin string - queueDelayMax time.Duration - QueueDelayMax string + QueueDelayMin stats.Duration + QueueDelayMax stats.Duration } for q, lastDelay := range h.lastDelay { s.QueueCount++ @@ -216,18 +222,16 @@ func (h *handler) Status() interface{} { if s.QueueMin > n || s.QueueCount == 1 { s.QueueMin = n } - if (s.queueDelayMin > lastDelay || s.queueDelayMin == 0) && lastDelay > 0 { - s.queueDelayMin = lastDelay + if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 { + s.QueueDelayMin = lastDelay } - if s.queueDelayMax < lastDelay { - s.queueDelayMax = lastDelay + if s.QueueDelayMax < lastDelay { + s.QueueDelayMax = lastDelay } } - s.QueueDelayMin = fmt.Sprintf("%.06f", s.queueDelayMin.Seconds()) - s.QueueDelayMax = fmt.Sprintf("%.06f", s.queueDelayMax.Seconds()) return &s } func (h *handler) setup() { - h.lastDelay = make(map[chan interface{}]time.Duration) + h.lastDelay = make(map[chan interface{}]stats.Duration) }