import (
"database/sql"
- "fmt"
- "log"
"strconv"
"strings"
"sync"
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- log.Fatalf("sql.Open: %s", err)
+ logger(nil).WithError(err).Fatal("sql.Open failed")
}
if err = db.Ping(); err != nil {
- log.Fatalf("db.Ping: %s", err)
+ logger(nil).WithError(err).Fatal("db.Ping failed")
}
ps.db = db
// on missed events, we cannot recover from a
// dropped connection without breaking our
// promises to clients.
- ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+ logger(nil).WithError(err).Error("listener problem")
+ ps.shutdown <- err
}
})
err = ps.pqListener.Listen("logs")
if err != nil {
- log.Fatal(err)
+ logger(nil).WithError(err).Fatal("pq Listen failed")
}
- debugLogf("pgEventSource listening")
+ logger(nil).Debug("pgEventSource listening")
go ps.run()
}
// concurrent queries would be bounded by
// client_count X client_queue_size.
e.Detail()
- debugLogf("event %d detail %+v", e.Serial, e.Detail())
+
+ logger(nil).
+ WithField("serial", e.Serial).
+ WithField("detail", e.Detail()).
+ Debug("event ready")
+
ps.mtx.Lock()
for sink := range ps.sinks {
sink.channel <- e
select {
case err, ok := <-ps.shutdown:
if ok {
- debugLogf("shutdown on error: %s", err)
+ logger(nil).WithError(err).Info("shutdown")
}
close(eventQueue)
return
case <-ticker.C:
- debugLogf("pgEventSource listener ping")
+ logger(nil).Debug("listener ping")
ps.pqListener.Ping()
case pqEvent, ok := <-ps.pqListener.Notify:
}
logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
if err != nil {
- log.Printf("bad notify payload: %+v", pqEvent)
+ logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
continue
}
serial++
Serial: serial,
db: ps.db,
}
- debugLogf("event %d %+v", e.Serial, e)
+ logger(nil).WithField("event", e).Debug("incoming")
eventQueue <- e
go e.Detail()
}
return sink
}
+func (ps *pgEventSource) DB() *sql.DB {
+ ps.setupOnce.Do(ps.setup)
+ return ps.db
+}
+
type pgEventSink struct {
channel chan *event
source *pgEventSource