import (
"database/sql"
+ "fmt"
"log"
"strconv"
"strings"
}
type pgEventSource struct {
- PgConfig pgConfig
- QueueSize int
+ DataSource string
+ QueueSize int
+ db *sql.DB
pqListener *pq.Listener
sinks map[*pgEventSink]bool
setupOnce sync.Once
mtx sync.Mutex
+ shutdown chan error
}
func (ps *pgEventSource) setup() {
+ ps.shutdown = make(chan error, 1)
ps.sinks = make(map[*pgEventSink]bool)
- go ps.run()
-}
-func (ps *pgEventSource) run() {
- db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
+ db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- log.Fatal(err)
+ log.Fatalf("sql.Open: %s", err)
}
+ if err = db.Ping(); err != nil {
+ log.Fatalf("db.Ping: %s", err)
+ }
+ ps.db = db
- listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
if err != nil {
// Until we have a mechanism for catching up
// on missed events, we cannot recover from a
// dropped connection without breaking our
// promises to clients.
- log.Fatalf("pgEventSource listener problem: %s", err)
+ ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
}
})
- err = listener.Listen("logs")
+ err = ps.pqListener.Listen("logs")
if err != nil {
log.Fatal(err)
}
-
debugLogf("pgEventSource listening")
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
eventQueue := make(chan *event, ps.QueueSize)
go func() {
defer ticker.Stop()
for {
select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ debugLogf("shutdown on error: %s", err)
+ }
+ close(eventQueue)
+ return
+
case <-ticker.C:
debugLogf("pgEventSource listener ping")
- listener.Ping()
+ ps.pqListener.Ping()
- case pqEvent, ok := <-listener.Notify:
+ case pqEvent, ok := <-ps.pqListener.Notify:
if !ok {
close(eventQueue)
return
LogID: logID,
Received: time.Now(),
Serial: serial,
- db: db,
+ db: ps.db,
}
debugLogf("event %d %+v", e.Serial, e)
eventQueue <- e
return sink
}
+func (ps *pgEventSource) DB() *sql.DB {
+ ps.setupOnce.Do(ps.setup)
+ return ps.db
+}
+
type pgEventSink struct {
channel chan *event
source *pgEventSource