14 type pgConfig map[string]string
16 func (c pgConfig) ConnectionString() string {
22 strings.Replace(v, `\`, `\\`, -1),
29 type pgEventSource struct {
33 pqListener *pq.Listener
34 sinks map[*pgEventSink]bool
39 func (ps *pgEventSource) setup() {
40 ps.sinks = make(map[*pgEventSink]bool)
44 func (ps *pgEventSource) run() {
45 db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
50 listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
52 // Until we have a mechanism for catching up
53 // on missed events, we cannot recover from a
54 // dropped connection without breaking our
55 // promises to clients.
56 log.Fatalf("pgEventSource listener problem: %s", err)
59 err = listener.Listen("logs")
64 debugLogf("pgEventSource listening")
66 eventQueue := make(chan *event, ps.QueueSize)
69 for e := range eventQueue {
70 // Wait for the "select ... from logs" call to
71 // finish. This limits max concurrent queries
72 // to ps.QueueSize. Without this, max
73 // concurrent queries would be bounded by
74 // client_count X client_queue_size.
76 debugLogf("event %d detail %+v", e.Serial, e.Detail())
78 for sink := range ps.sinks {
86 ticker := time.NewTicker(time.Minute)
91 debugLogf("pgEventSource listener ping")
94 case pqEvent, ok := <-listener.Notify:
99 if pqEvent.Channel != "logs" {
102 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
104 log.Printf("bad notify payload: %+v", pqEvent)
110 Received: time.Now(),
114 debugLogf("event %d %+v", e.Serial, e)
121 // NewSink subscribes to the event source. NewSink returns an
122 // eventSink, whose Channel() method returns a channel: a pointer to
123 // each subsequent event will be sent to that channel.
125 // The caller must ensure events are received from the sink channel as
126 // quickly as possible because when one sink stops being ready, all
127 // other sinks block.
128 func (ps *pgEventSource) NewSink() eventSink {
129 ps.setupOnce.Do(ps.setup)
130 sink := &pgEventSink{
131 channel: make(chan *event, 1),
135 ps.sinks[sink] = true
140 type pgEventSink struct {
142 source *pgEventSource
145 func (sink *pgEventSink) Channel() <-chan *event {
149 func (sink *pgEventSink) Stop() {
151 // Ensure this sink cannot fill up and block the
152 // server-side queue (which otherwise could in turn
153 // block our mtx.Lock() here)
154 for _ = range sink.channel {
157 sink.source.mtx.Lock()
158 delete(sink.source.sinks, sink)
159 sink.source.mtx.Unlock()