import (
"database/sql"
- "log"
+ "fmt"
+ "strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/lib/pq"
}
type pgEventSource struct {
- PgConfig pgConfig
- QueueSize int
+ DataSource string
+ QueueSize int
+ db *sql.DB
pqListener *pq.Listener
+ queue chan *event
sinks map[*pgEventSink]bool
setupOnce sync.Once
mtx sync.Mutex
+ shutdown chan error
+
+ lastQDelay time.Duration
+ eventsIn uint64
+ eventsOut uint64
}
func (ps *pgEventSource) setup() {
+ ps.shutdown = make(chan error, 1)
ps.sinks = make(map[*pgEventSink]bool)
- go ps.run()
-}
-func (ps *pgEventSource) run() {
- db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
+ db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- log.Fatal(err)
+ logger(nil).WithError(err).Fatal("sql.Open failed")
}
+ if err = db.Ping(); err != nil {
+ logger(nil).WithError(err).Fatal("db.Ping failed")
+ }
+ ps.db = db
- listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
if err != nil {
// Until we have a mechanism for catching up
// on missed events, we cannot recover from a
// dropped connection without breaking our
// promises to clients.
- log.Fatal(err)
+ logger(nil).WithError(err).Error("listener problem")
+ ps.shutdown <- err
}
})
- err = listener.Listen("logs")
+ err = ps.pqListener.Listen("logs")
if err != nil {
- log.Fatal(err)
+ logger(nil).WithError(err).Fatal("pq Listen failed")
}
- go func() {
- for _ = range time.NewTicker(time.Minute).C {
- listener.Ping()
- }
- }()
+ logger(nil).Debug("pgEventSource listening")
+
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+ ps.queue = make(chan *event, ps.QueueSize)
- eventQueue := make(chan *event, ps.QueueSize)
go func() {
- for e := range eventQueue {
+ for e := range ps.queue {
// Wait for the "select ... from logs" call to
// finish. This limits max concurrent queries
// to ps.QueueSize. Without this, max
// concurrent queries would be bounded by
// client_count X client_queue_size.
e.Detail()
- debugLogf("%+v", e)
+
+ logger(nil).
+ WithField("serial", e.Serial).
+ WithField("detail", e.Detail()).
+ Debug("event ready")
+ e.Ready = time.Now()
+ ps.lastQDelay = e.Ready.Sub(e.Received)
+
ps.mtx.Lock()
+ atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
for sink := range ps.sinks {
sink.channel <- e
}
}()
var serial uint64
- for pqEvent := range listener.Notify {
- if pqEvent.Channel != "logs" {
- continue
- }
- serial++
- e := &event{
- LogUUID: pqEvent.Extra,
- Received: time.Now(),
- Serial: serial,
- db: db,
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ logger(nil).WithError(err).Info("shutdown")
+ }
+ close(ps.queue)
+ return
+
+ case <-ticker.C:
+ logger(nil).Debug("listener ping")
+ ps.pqListener.Ping()
+
+ case pqEvent, ok := <-ps.pqListener.Notify:
+ if !ok {
+ close(ps.queue)
+ return
+ }
+ if pqEvent.Channel != "logs" {
+ continue
+ }
+ logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+ if err != nil {
+ logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+ continue
+ }
+ serial++
+ e := &event{
+ LogID: logID,
+ Received: time.Now(),
+ Serial: serial,
+ db: ps.db,
+ }
+ logger(nil).WithField("event", e).Debug("incoming")
+ atomic.AddUint64(&ps.eventsIn, 1)
+ ps.queue <- e
+ go e.Detail()
}
- debugLogf("%+v", e)
- eventQueue <- e
- go e.Detail()
}
}
-// NewSink subscribes to the event source. If c is not nil, it will be
-// used as the event channel. Otherwise, a new channel will be
-// created. Either way, the sink channel will be returned by the
-// Channel() method of the returned eventSink. All subsequent events
-// will be sent to the sink channel. The caller must ensure events are
-// received from the sink channel as quickly as possible: when one
-// sink blocks, all other sinks also block.
-func (ps *pgEventSource) NewSink(c chan *event) eventSink {
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
ps.setupOnce.Do(ps.setup)
- if c == nil {
- c = make(chan *event, 1)
- }
sink := &pgEventSink{
- channel: c,
+ channel: make(chan *event, 1),
source: ps,
}
ps.mtx.Lock()
return sink
}
+func (ps *pgEventSource) DB() *sql.DB {
+ ps.setupOnce.Do(ps.setup)
+ return ps.db
+}
+
+func (ps *pgEventSource) Status() interface{} {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+ blocked := 0
+ for sink := range ps.sinks {
+ blocked += len(sink.channel)
+ }
+ return map[string]interface{}{
+ "EventsIn": atomic.LoadUint64(&ps.eventsIn),
+ "EventsOut": atomic.LoadUint64(&ps.eventsOut),
+ "Queue": len(ps.queue),
+ "QueueLimit": cap(ps.queue),
+ "QueueDelay": fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),
+ "Sinks": len(ps.sinks),
+ "SinksBlocked": blocked,
+ }
+}
+
type pgEventSink struct {
channel chan *event
source *pgEventSource