15 type pgConfig map[string]string
17 func (c pgConfig) ConnectionString() string {
23 strings.Replace(v, `\`, `\\`, -1),
30 type pgEventSource struct {
35 pqListener *pq.Listener
36 sinks map[*pgEventSink]bool
42 func (ps *pgEventSource) setup() {
43 ps.shutdown = make(chan error, 1)
44 ps.sinks = make(map[*pgEventSink]bool)
46 db, err := sql.Open("postgres", ps.DataSource)
48 log.Fatalf("sql.Open: %s", err)
50 if err = db.Ping(); err != nil {
51 log.Fatalf("db.Ping: %s", err)
55 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
57 // Until we have a mechanism for catching up
58 // on missed events, we cannot recover from a
59 // dropped connection without breaking our
60 // promises to clients.
61 ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
64 err = ps.pqListener.Listen("logs")
68 debugLogf("pgEventSource listening")
73 func (ps *pgEventSource) run() {
74 eventQueue := make(chan *event, ps.QueueSize)
77 for e := range eventQueue {
78 // Wait for the "select ... from logs" call to
79 // finish. This limits max concurrent queries
80 // to ps.QueueSize. Without this, max
81 // concurrent queries would be bounded by
82 // client_count X client_queue_size.
84 debugLogf("event %d detail %+v", e.Serial, e.Detail())
86 for sink := range ps.sinks {
94 ticker := time.NewTicker(time.Minute)
98 case err, ok := <-ps.shutdown:
100 debugLogf("shutdown on error: %s", err)
106 debugLogf("pgEventSource listener ping")
109 case pqEvent, ok := <-ps.pqListener.Notify:
114 if pqEvent.Channel != "logs" {
117 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
119 log.Printf("bad notify payload: %+v", pqEvent)
125 Received: time.Now(),
129 debugLogf("event %d %+v", e.Serial, e)
136 // NewSink subscribes to the event source. NewSink returns an
137 // eventSink, whose Channel() method returns a channel: a pointer to
138 // each subsequent event will be sent to that channel.
140 // The caller must ensure events are received from the sink channel as
141 // quickly as possible because when one sink stops being ready, all
142 // other sinks block.
143 func (ps *pgEventSource) NewSink() eventSink {
144 ps.setupOnce.Do(ps.setup)
145 sink := &pgEventSink{
146 channel: make(chan *event, 1),
150 ps.sinks[sink] = true
155 type pgEventSink struct {
157 source *pgEventSource
160 func (sink *pgEventSink) Channel() <-chan *event {
164 func (sink *pgEventSink) Stop() {
166 // Ensure this sink cannot fill up and block the
167 // server-side queue (which otherwise could in turn
168 // block our mtx.Lock() here)
169 for _ = range sink.channel {
172 sink.source.mtx.Lock()
173 delete(sink.source.sinks, sink)
174 sink.source.mtx.Unlock()