type event struct {
LogID uint64
Received time.Time
+ Ready time.Time
Serial uint64
db *sql.DB
import (
"context"
+ "fmt"
"io"
"sync"
"time"
QueueSize int
mtx sync.Mutex
- queues map[chan interface{}]struct{}
+ lastDelay map[chan interface{}]time.Duration
setupOnce sync.Once
}
queue := make(chan interface{}, h.QueueSize)
h.mtx.Lock()
- h.queues[queue] = struct{}{}
+ h.lastDelay[queue] = 0
h.mtx.Unlock()
defer func() {
h.mtx.Lock()
- delete(h.queues, queue)
+ delete(h.lastDelay, queue)
h.mtx.Unlock()
}()
log.Debug("sent")
if e != nil {
- stats.QueueDelayNs += t0.Sub(e.Received)
+ stats.QueueDelayNs += t0.Sub(e.Ready)
+ h.mtx.Lock()
+ h.lastDelay[queue] = time.Since(e.Ready)
+ h.mtx.Unlock()
}
stats.WriteDelayNs += time.Since(t0)
stats.EventBytes += uint64(len(buf))
defer h.mtx.Unlock()
var s struct {
- QueueCount int
- QueueMax int
- QueueTotal uint64
+ QueueCount int
+ QueueMin int
+ QueueMax int
+ QueueTotal uint64
+ queueDelayMin time.Duration
+ QueueDelayMin string
+ queueDelayMax time.Duration
+ QueueDelayMax string
}
- for q := range h.queues {
+ for q, lastDelay := range h.lastDelay {
+ s.QueueCount++
n := len(q)
s.QueueTotal += uint64(n)
if s.QueueMax < n {
s.QueueMax = n
}
+ if s.QueueMin > n || s.QueueCount == 1 {
+ s.QueueMin = n
+ }
+ if (s.queueDelayMin > lastDelay || s.queueDelayMin == 0) && lastDelay > 0 {
+ s.queueDelayMin = lastDelay
+ }
+ if s.queueDelayMax < lastDelay {
+ s.queueDelayMax = lastDelay
+ }
}
- s.QueueCount = len(h.queues)
+ s.QueueDelayMin = fmt.Sprintf("%.06f", s.queueDelayMin.Seconds())
+ s.QueueDelayMax = fmt.Sprintf("%.06f", s.queueDelayMax.Seconds())
return &s
}
func (h *handler) setup() {
- h.queues = make(map[chan interface{}]struct{})
+ h.lastDelay = make(map[chan interface{}]time.Duration)
}
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/lib/pq"
shutdown chan error
lastQDelay time.Duration
+ eventsIn uint64
+ eventsOut uint64
}
func (ps *pgEventSource) setup() {
WithField("serial", e.Serial).
WithField("detail", e.Detail()).
Debug("event ready")
- ps.lastQDelay = time.Now().Sub(e.Received)
+ e.Ready = time.Now()
+ ps.lastQDelay = e.Ready.Sub(e.Received)
ps.mtx.Lock()
+ atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
for sink := range ps.sinks {
sink.channel <- e
}
db: ps.db,
}
logger(nil).WithField("event", e).Debug("incoming")
+ atomic.AddUint64(&ps.eventsIn, 1)
ps.queue <- e
go e.Detail()
}
blocked += len(sink.channel)
}
return map[string]interface{}{
+ "EventsIn": atomic.LoadUint64(&ps.eventsIn),
+ "EventsOut": atomic.LoadUint64(&ps.eventsOut),
"Queue": len(ps.queue),
"QueueLimit": cap(ps.queue),
"QueueDelay": fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),
}
type routerStatus struct {
- Connections int64
+ ReqsReceived int64
+ ReqsActive int64
}
type Statuser interface {
"Outgoing": rtr.handler.Status(),
}
if es, ok := rtr.eventSource.(Statuser); ok {
- s["Incoming"] = es.Status()
+ s["EventSource"] = es.Status()
}
return s
}
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
- atomic.AddInt64(&rtr.status.Connections, 1)
- defer atomic.AddInt64(&rtr.status.Connections, -1)
+ atomic.AddInt64(&rtr.status.ReqsReceived, 1)
+ atomic.AddInt64(&rtr.status.ReqsActive, 1)
+ defer atomic.AddInt64(&rtr.status.ReqsActive, -1)
logger := logger(req.Context()).
WithField("RequestID", rtr.newReqID())
// same thing all over again.
time.Sleep(100 * time.Millisecond)
}
+ now := time.Now()
e := &event{
LogID: id,
- Received: time.Now(),
+ Received: now,
+ Ready: now,
db: sess.db,
}
if sub.match(sess, e) {