X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b0ba939812720869fca0a75b07d42518d4953345..7ef471d9d0513b0b06cf0007e575f16eda93ec29:/services/ws/handler.go diff --git a/services/ws/handler.go b/services/ws/handler.go index d2c119acfe..b07b78cc7c 100644 --- a/services/ws/handler.go +++ b/services/ws/handler.go @@ -7,6 +7,7 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/stats" ) type handler struct { @@ -15,7 +16,7 @@ type handler struct { QueueSize int mtx sync.Mutex - queues map[chan interface{}]struct{} + lastDelay map[chan interface{}]stats.Duration setupOnce sync.Once } @@ -26,10 +27,11 @@ type handlerStats struct { EventCount uint64 } -func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (stats handlerStats) { +func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) { h.setupOnce.Do(h.setup) ctx, cancel := context.WithCancel(ws.Request().Context()) + defer cancel() log := logger(ctx) incoming := eventSource.NewSink() @@ -37,11 +39,11 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC queue := make(chan interface{}, h.QueueSize) h.mtx.Lock() - h.queues[queue] = struct{}{} + h.lastDelay[queue] = 0 h.mtx.Unlock() defer func() { h.mtx.Lock() - delete(h.queues, queue) + delete(h.lastDelay, queue) h.mtx.Unlock() }() @@ -131,11 +133,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC log.Debug("sent") if e != nil { - stats.QueueDelayNs += t0.Sub(e.Received) + hStats.QueueDelayNs += t0.Sub(e.Ready) + h.mtx.Lock() + h.lastDelay[queue] = stats.Duration(time.Since(e.Ready)) + h.mtx.Unlock() } - stats.WriteDelayNs += time.Since(t0) - stats.EventBytes += uint64(len(buf)) - stats.EventCount++ + hStats.WriteDelayNs += time.Since(t0) + hStats.EventBytes += uint64(len(buf)) + hStats.EventCount++ } }() @@ -188,26 +193,38 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC return } -func (h *handler) Status() interface{} { +func (h *handler) DebugStatus() interface{} { h.mtx.Lock() defer h.mtx.Unlock() var s struct { - QueueCount int - QueueMax int - QueueTotal uint64 + QueueCount int + QueueMin int + QueueMax int + QueueTotal uint64 + QueueDelayMin stats.Duration + QueueDelayMax stats.Duration } - for q := range h.queues { + for q, lastDelay := range h.lastDelay { + s.QueueCount++ n := len(q) s.QueueTotal += uint64(n) if s.QueueMax < n { s.QueueMax = n } + if s.QueueMin > n || s.QueueCount == 1 { + s.QueueMin = n + } + if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 { + s.QueueDelayMin = lastDelay + } + if s.QueueDelayMax < lastDelay { + s.QueueDelayMax = lastDelay + } } - s.QueueCount = len(h.queues) return &s } func (h *handler) setup() { - h.queues = make(map[chan interface{}]struct{}) + h.lastDelay = make(map[chan interface{}]stats.Duration) }