8460: More statistics in status.json: events in/out, reqs received, lowest client...
authorTom Clegg <tom@curoverse.com>
Thu, 24 Nov 2016 19:18:59 +0000 (14:18 -0500)
committerTom Clegg <tom@curoverse.com>
Thu, 24 Nov 2016 19:18:59 +0000 (14:18 -0500)
services/ws/event.go
services/ws/handler.go
services/ws/pg.go
services/ws/router.go
services/ws/session_v0.go

index 280035b9ef5840e9ff6c11fa8d4517f0ac8f850e..fa2a5df1b064c2d1b1bfc59a4cea63d54b471150 100644 (file)
@@ -22,6 +22,7 @@ type eventSource interface {
 type event struct {
        LogID    uint64
        Received time.Time
+       Ready    time.Time
        Serial   uint64
 
        db     *sql.DB
index d2c119acfeab53b5a4ed799455e8802114c7c9d1..dace39be35952c99d55e9cc89f4cefcde3978663 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "context"
+       "fmt"
        "io"
        "sync"
        "time"
@@ -15,7 +16,7 @@ type handler struct {
        QueueSize   int
 
        mtx       sync.Mutex
-       queues    map[chan interface{}]struct{}
+       lastDelay map[chan interface{}]time.Duration
        setupOnce sync.Once
 }
 
@@ -37,11 +38,11 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
 
        queue := make(chan interface{}, h.QueueSize)
        h.mtx.Lock()
-       h.queues[queue] = struct{}{}
+       h.lastDelay[queue] = 0
        h.mtx.Unlock()
        defer func() {
                h.mtx.Lock()
-               delete(h.queues, queue)
+               delete(h.lastDelay, queue)
                h.mtx.Unlock()
        }()
 
@@ -131,7 +132,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        log.Debug("sent")
 
                        if e != nil {
-                               stats.QueueDelayNs += t0.Sub(e.Received)
+                               stats.QueueDelayNs += t0.Sub(e.Ready)
+                               h.mtx.Lock()
+                               h.lastDelay[queue] = time.Since(e.Ready)
+                               h.mtx.Unlock()
                        }
                        stats.WriteDelayNs += time.Since(t0)
                        stats.EventBytes += uint64(len(buf))
@@ -193,21 +197,37 @@ func (h *handler) Status() interface{} {
        defer h.mtx.Unlock()
 
        var s struct {
-               QueueCount int
-               QueueMax   int
-               QueueTotal uint64
+               QueueCount    int
+               QueueMin      int
+               QueueMax      int
+               QueueTotal    uint64
+               queueDelayMin time.Duration
+               QueueDelayMin string
+               queueDelayMax time.Duration
+               QueueDelayMax string
        }
-       for q := range h.queues {
+       for q, lastDelay := range h.lastDelay {
+               s.QueueCount++
                n := len(q)
                s.QueueTotal += uint64(n)
                if s.QueueMax < n {
                        s.QueueMax = n
                }
+               if s.QueueMin > n || s.QueueCount == 1 {
+                       s.QueueMin = n
+               }
+               if (s.queueDelayMin > lastDelay || s.queueDelayMin == 0) && lastDelay > 0 {
+                       s.queueDelayMin = lastDelay
+               }
+               if s.queueDelayMax < lastDelay {
+                       s.queueDelayMax = lastDelay
+               }
        }
-       s.QueueCount = len(h.queues)
+       s.QueueDelayMin = fmt.Sprintf("%.06f", s.queueDelayMin.Seconds())
+       s.QueueDelayMax = fmt.Sprintf("%.06f", s.queueDelayMax.Seconds())
        return &s
 }
 
 func (h *handler) setup() {
-       h.queues = make(map[chan interface{}]struct{})
+       h.lastDelay = make(map[chan interface{}]time.Duration)
 }
index b6b064e49c2196a1b1b96ec5115a17913ce09d0c..dc899c56c0e63c43674ed52f464f859958a286f2 100644 (file)
@@ -6,6 +6,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/lib/pq"
@@ -39,6 +40,8 @@ type pgEventSource struct {
        shutdown   chan error
 
        lastQDelay time.Duration
+       eventsIn   uint64
+       eventsOut  uint64
 }
 
 func (ps *pgEventSource) setup() {
@@ -89,9 +92,11 @@ func (ps *pgEventSource) run() {
                                WithField("serial", e.Serial).
                                WithField("detail", e.Detail()).
                                Debug("event ready")
-                       ps.lastQDelay = time.Now().Sub(e.Received)
+                       e.Ready = time.Now()
+                       ps.lastQDelay = e.Ready.Sub(e.Received)
 
                        ps.mtx.Lock()
+                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
                        for sink := range ps.sinks {
                                sink.channel <- e
                        }
@@ -136,6 +141,7 @@ func (ps *pgEventSource) run() {
                                db:       ps.db,
                        }
                        logger(nil).WithField("event", e).Debug("incoming")
+                       atomic.AddUint64(&ps.eventsIn, 1)
                        ps.queue <- e
                        go e.Detail()
                }
@@ -174,6 +180,8 @@ func (ps *pgEventSource) Status() interface{} {
                blocked += len(sink.channel)
        }
        return map[string]interface{}{
+               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
+               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
                "Queue":        len(ps.queue),
                "QueueLimit":   cap(ps.queue),
                "QueueDelay":   fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),
index 18eaf73684087244cb16c655a5e06df5ad3b1813..073a398a29a0d88480bb3c1f52a5848abf4f2fcc 100644 (file)
@@ -37,7 +37,8 @@ type router struct {
 }
 
 type routerStatus struct {
-       Connections int64
+       ReqsReceived int64
+       ReqsActive   int64
 }
 
 type Statuser interface {
@@ -97,7 +98,7 @@ func (rtr *router) Status() interface{} {
                "Outgoing": rtr.handler.Status(),
        }
        if es, ok := rtr.eventSource.(Statuser); ok {
-               s["Incoming"] = es.Status()
+               s["EventSource"] = es.Status()
        }
        return s
 }
@@ -115,8 +116,9 @@ func (rtr *router) serveStatus(resp http.ResponseWriter, req *http.Request) {
 
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        rtr.setupOnce.Do(rtr.setup)
-       atomic.AddInt64(&rtr.status.Connections, 1)
-       defer atomic.AddInt64(&rtr.status.Connections, -1)
+       atomic.AddInt64(&rtr.status.ReqsReceived, 1)
+       atomic.AddInt64(&rtr.status.ReqsActive, 1)
+       defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
 
        logger := logger(req.Context()).
                WithField("RequestID", rtr.newReqID())
index a60a4a382fd781c368c31fe8ea9cc8533a6b2867..9a9707bb64e6d89a32833486a4c6c445e4ea8db3 100644 (file)
@@ -164,9 +164,11 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
                }
+               now := time.Now()
                e := &event{
                        LogID:    id,
-                       Received: time.Now(),
+                       Received: now,
+                       Ready:    now,
                        db:       sess.db,
                }
                if sub.match(sess, e) {