"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/lib/pq"
shutdown chan error
lastQDelay time.Duration
+ eventsIn uint64
+ eventsOut uint64
}
func (ps *pgEventSource) setup() {
WithField("serial", e.Serial).
WithField("detail", e.Detail()).
Debug("event ready")
- ps.lastQDelay = time.Now().Sub(e.Received)
+ e.Ready = time.Now()
+ ps.lastQDelay = e.Ready.Sub(e.Received)
ps.mtx.Lock()
+ atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
for sink := range ps.sinks {
sink.channel <- e
}
db: ps.db,
}
logger(nil).WithField("event", e).Debug("incoming")
+ atomic.AddUint64(&ps.eventsIn, 1)
ps.queue <- e
go e.Detail()
}
blocked += len(sink.channel)
}
return map[string]interface{}{
+ "EventsIn": atomic.LoadUint64(&ps.eventsIn),
+ "EventsOut": atomic.LoadUint64(&ps.eventsOut),
"Queue": len(ps.queue),
"QueueLimit": cap(ps.queue),
"QueueDelay": fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),