13 type pgConfig map[string]string
15 func (c pgConfig) ConnectionString() string {
21 strings.Replace(v, `\`, `\\`, -1),
28 type pgEventSource struct {
33 pqListener *pq.Listener
34 sinks map[*pgEventSink]bool
40 func (ps *pgEventSource) setup() {
41 ps.shutdown = make(chan error, 1)
42 ps.sinks = make(map[*pgEventSink]bool)
44 db, err := sql.Open("postgres", ps.DataSource)
46 logger(nil).WithError(err).Fatal("sql.Open failed")
48 if err = db.Ping(); err != nil {
49 logger(nil).WithError(err).Fatal("db.Ping failed")
53 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
55 // Until we have a mechanism for catching up
56 // on missed events, we cannot recover from a
57 // dropped connection without breaking our
58 // promises to clients.
59 logger(nil).WithError(err).Error("listener problem")
63 err = ps.pqListener.Listen("logs")
65 logger(nil).WithError(err).Fatal("pq Listen failed")
67 logger(nil).Debug("pgEventSource listening")
72 func (ps *pgEventSource) run() {
73 eventQueue := make(chan *event, ps.QueueSize)
76 for e := range eventQueue {
77 // Wait for the "select ... from logs" call to
78 // finish. This limits max concurrent queries
79 // to ps.QueueSize. Without this, max
80 // concurrent queries would be bounded by
81 // client_count X client_queue_size.
85 WithField("serial", e.Serial).
86 WithField("detail", e.Detail()).
90 for sink := range ps.sinks {
98 ticker := time.NewTicker(time.Minute)
102 case err, ok := <-ps.shutdown:
104 logger(nil).WithError(err).Info("shutdown")
110 logger(nil).Debug("listener ping")
113 case pqEvent, ok := <-ps.pqListener.Notify:
118 if pqEvent.Channel != "logs" {
121 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
123 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
129 Received: time.Now(),
133 logger(nil).WithField("event", e).Debug("incoming")
140 // NewSink subscribes to the event source. NewSink returns an
141 // eventSink, whose Channel() method returns a channel: a pointer to
142 // each subsequent event will be sent to that channel.
144 // The caller must ensure events are received from the sink channel as
145 // quickly as possible because when one sink stops being ready, all
146 // other sinks block.
147 func (ps *pgEventSource) NewSink() eventSink {
148 ps.setupOnce.Do(ps.setup)
149 sink := &pgEventSink{
150 channel: make(chan *event, 1),
154 ps.sinks[sink] = true
159 func (ps *pgEventSource) DB() *sql.DB {
160 ps.setupOnce.Do(ps.setup)
164 type pgEventSink struct {
166 source *pgEventSource
169 func (sink *pgEventSink) Channel() <-chan *event {
173 func (sink *pgEventSink) Stop() {
175 // Ensure this sink cannot fill up and block the
176 // server-side queue (which otherwise could in turn
177 // block our mtx.Lock() here)
178 for _ = range sink.channel {
181 sink.source.mtx.Lock()
182 delete(sink.source.sinks, sink)
183 sink.source.mtx.Unlock()