14 type pgConfig map[string]string
16 func (c pgConfig) ConnectionString() string {
22 strings.Replace(v, `\`, `\\`, -1),
29 type pgEventSource struct {
33 pqListener *pq.Listener
34 sinks map[*pgEventSink]bool
39 func (ps *pgEventSource) setup() {
40 ps.sinks = make(map[*pgEventSink]bool)
44 func (ps *pgEventSource) run() {
45 db, err := sql.Open("postgres", ps.DataSource)
47 log.Fatalf("sql.Open: %s", err)
49 if err = db.Ping(); err != nil {
50 log.Fatalf("db.Ping: %s", err)
53 listener := pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
55 // Until we have a mechanism for catching up
56 // on missed events, we cannot recover from a
57 // dropped connection without breaking our
58 // promises to clients.
59 log.Fatalf("pgEventSource listener problem: %s", err)
62 err = listener.Listen("logs")
67 debugLogf("pgEventSource listening")
69 eventQueue := make(chan *event, ps.QueueSize)
72 for e := range eventQueue {
73 // Wait for the "select ... from logs" call to
74 // finish. This limits max concurrent queries
75 // to ps.QueueSize. Without this, max
76 // concurrent queries would be bounded by
77 // client_count X client_queue_size.
79 debugLogf("event %d detail %+v", e.Serial, e.Detail())
81 for sink := range ps.sinks {
89 ticker := time.NewTicker(time.Minute)
94 debugLogf("pgEventSource listener ping")
97 case pqEvent, ok := <-listener.Notify:
102 if pqEvent.Channel != "logs" {
105 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
107 log.Printf("bad notify payload: %+v", pqEvent)
113 Received: time.Now(),
117 debugLogf("event %d %+v", e.Serial, e)
124 // NewSink subscribes to the event source. NewSink returns an
125 // eventSink, whose Channel() method returns a channel: a pointer to
126 // each subsequent event will be sent to that channel.
128 // The caller must ensure events are received from the sink channel as
129 // quickly as possible because when one sink stops being ready, all
130 // other sinks block.
131 func (ps *pgEventSource) NewSink() eventSink {
132 ps.setupOnce.Do(ps.setup)
133 sink := &pgEventSink{
134 channel: make(chan *event, 1),
138 ps.sinks[sink] = true
143 type pgEventSink struct {
145 source *pgEventSource
148 func (sink *pgEventSink) Channel() <-chan *event {
152 func (sink *pgEventSink) Stop() {
154 // Ensure this sink cannot fill up and block the
155 // server-side queue (which otherwise could in turn
156 // block our mtx.Lock() here)
157 for _ = range sink.channel {
160 sink.source.mtx.Lock()
161 delete(sink.source.sinks, sink)
162 sink.source.mtx.Unlock()