X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/74d5bfb293b2acf76d639df12ff8769bc333a5f2..1667f9860de21d29bbe32bb827db29eca62d9aeb:/services/ws/handler.go diff --git a/services/ws/handler.go b/services/ws/handler.go index 91a77022d6..72291900fa 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -3,16 +3,21 @@ package main import ( "context" "io" + "sync" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/stats" ) type handler struct { Client arvados.Client PingTimeout time.Duration QueueSize int - NewSession func(wsConn, chan<- interface{}) (session, error) + + mtx sync.Mutex + lastDelay map[chan interface{}]stats.Duration + setupOnce sync.Once } type handlerStats struct { @@ -22,16 +27,34 @@ type handlerStats struct { EventCount uint64 } -func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) { +func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) { + h.setupOnce.Do(h.setup) + ctx, cancel := context.WithCancel(ws.Request().Context()) + defer cancel() log := logger(ctx) + + incoming := eventSource.NewSink() + defer incoming.Stop() + queue := make(chan interface{}, h.QueueSize) - sess, err := h.NewSession(ws, queue) + h.mtx.Lock() + h.lastDelay[queue] = 0 + h.mtx.Unlock() + defer func() { + h.mtx.Lock() + delete(h.lastDelay, queue) + h.mtx.Unlock() + }() + + sess, err := newSession(ws, queue) if err != nil { - log.WithError(err).Error("NewSession failed") + log.WithError(err).Error("newSession failed") return } + // Receive websocket frames from the client and pass them to + // sess.Receive(). go func() { buf := make([]byte, 2<<20) for { @@ -63,6 +86,9 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) } }() + // Take items from the outgoing queue, serialize them using + // sess.EventMessage() as needed, and send them to the client + // as websocket frames. go func() { for { var ok bool @@ -112,11 +138,14 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) log.Debug("sent") if e != nil { - stats.QueueDelayNs += t0.Sub(e.Received) + hStats.QueueDelayNs += t0.Sub(e.Ready) + h.mtx.Lock() + h.lastDelay[queue] = stats.Duration(time.Since(e.Ready)) + h.mtx.Unlock() } - stats.WriteDelayNs += time.Since(t0) - stats.EventBytes += uint64(len(buf)) - stats.EventCount++ + hStats.WriteDelayNs += time.Since(t0) + hStats.EventBytes += uint64(len(buf)) + hStats.EventCount++ } }() @@ -146,7 +175,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) } } continue - case e, ok := <-incoming: + case e, ok := <-incoming.Channel(): if !ok { cancel() return @@ -168,3 +197,39 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) <-ctx.Done() return } + +func (h *handler) DebugStatus() interface{} { + h.mtx.Lock() + defer h.mtx.Unlock() + + var s struct { + QueueCount int + QueueMin int + QueueMax int + QueueTotal uint64 + QueueDelayMin stats.Duration + QueueDelayMax stats.Duration + } + for q, lastDelay := range h.lastDelay { + s.QueueCount++ + n := len(q) + s.QueueTotal += uint64(n) + if s.QueueMax < n { + s.QueueMax = n + } + if s.QueueMin > n || s.QueueCount == 1 { + s.QueueMin = n + } + if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 { + s.QueueDelayMin = lastDelay + } + if s.QueueDelayMax < lastDelay { + s.QueueDelayMax = lastDelay + } + } + return &s +} + +func (h *handler) setup() { + h.lastDelay = make(map[chan interface{}]stats.Duration) +}