import (
"context"
"io"
+ "sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
)
type handler struct {
Client arvados.Client
PingTimeout time.Duration
QueueSize int
- NewSession func(wsConn, chan<- interface{}) (session, error)
+
+ mtx sync.Mutex
+ lastDelay map[chan interface{}]stats.Duration
+ setupOnce sync.Once
}
type handlerStats struct {
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (hStats handlerStats) {
+ h.setupOnce.Do(h.setup)
+
ctx, cancel := context.WithCancel(ws.Request().Context())
+ defer cancel()
log := logger(ctx)
+
+ incoming := eventSource.NewSink()
+ defer incoming.Stop()
+
queue := make(chan interface{}, h.QueueSize)
- sess, err := h.NewSession(ws, queue)
+ h.mtx.Lock()
+ h.lastDelay[queue] = 0
+ h.mtx.Unlock()
+ defer func() {
+ h.mtx.Lock()
+ delete(h.lastDelay, queue)
+ h.mtx.Unlock()
+ }()
+
+ sess, err := newSession(ws, queue)
if err != nil {
- log.WithError(err).Error("NewSession failed")
+ log.WithError(err).Error("newSession failed")
return
}
log.Debug("sent")
if e != nil {
- stats.QueueDelayNs += t0.Sub(e.Received)
+ hStats.QueueDelayNs += t0.Sub(e.Ready)
+ h.mtx.Lock()
+ h.lastDelay[queue] = stats.Duration(time.Since(e.Ready))
+ h.mtx.Unlock()
}
- stats.WriteDelayNs += time.Since(t0)
- stats.EventBytes += uint64(len(buf))
- stats.EventCount++
+ hStats.WriteDelayNs += time.Since(t0)
+ hStats.EventBytes += uint64(len(buf))
+ hStats.EventCount++
}
}()
}
}
continue
- case e, ok := <-incoming:
+ case e, ok := <-incoming.Channel():
if !ok {
cancel()
return
<-ctx.Done()
return
}
+
+func (h *handler) DebugStatus() interface{} {
+ h.mtx.Lock()
+ defer h.mtx.Unlock()
+
+ var s struct {
+ QueueCount int
+ QueueMin int
+ QueueMax int
+ QueueTotal uint64
+ QueueDelayMin stats.Duration
+ QueueDelayMax stats.Duration
+ }
+ for q, lastDelay := range h.lastDelay {
+ s.QueueCount++
+ n := len(q)
+ s.QueueTotal += uint64(n)
+ if s.QueueMax < n {
+ s.QueueMax = n
+ }
+ if s.QueueMin > n || s.QueueCount == 1 {
+ s.QueueMin = n
+ }
+ if (s.QueueDelayMin > lastDelay || s.QueueDelayMin == 0) && lastDelay > 0 {
+ s.QueueDelayMin = lastDelay
+ }
+ if s.QueueDelayMax < lastDelay {
+ s.QueueDelayMax = lastDelay
+ }
+ }
+ return &s
+}
+
+func (h *handler) setup() {
+ h.lastDelay = make(map[chan interface{}]stats.Duration)
+}