-func (h *handler) Handle(ws wsConn, events <-chan *event) {
- sess, err := h.NewSession(ws, h.Client)
+type handlerStats struct {
+ QueueDelayNs time.Duration
+ WriteDelayNs time.Duration
+ EventBytes uint64
+ EventCount uint64
+}
+
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+ h.setupOnce.Do(h.setup)
+
+ ctx, cancel := context.WithCancel(ws.Request().Context())
+ defer cancel()
+ log := logger(ctx)
+
+ incoming := eventSource.NewSink()
+ defer incoming.Stop()
+
+ queue := make(chan interface{}, h.QueueSize)
+ h.mtx.Lock()
+ h.lastDelay[queue] = 0
+ h.mtx.Unlock()
+ defer func() {
+ h.mtx.Lock()
+ delete(h.lastDelay, queue)
+ h.mtx.Unlock()
+ }()
+
+ sess, err := newSession(ws, queue)