13 type pgConfig map[string]string
15 func (c pgConfig) ConnectionString() string {
21 strings.Replace(v, `\`, `\\`, -1),
28 type pgEventSource struct {
32 pqListener *pq.Listener
33 sinks map[*pgEventSink]bool
38 func (ps *pgEventSource) setup() {
39 ps.sinks = make(map[*pgEventSink]bool)
43 func (ps *pgEventSource) run() {
44 db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
49 listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
51 // Until we have a mechanism for catching up
52 // on missed events, we cannot recover from a
53 // dropped connection without breaking our
54 // promises to clients.
58 err = listener.Listen("logs")
63 for _ = range time.NewTicker(time.Minute).C {
68 eventQueue := make(chan *event, ps.QueueSize)
70 for e := range eventQueue {
71 // Wait for the "select ... from logs" call to
72 // finish. This limits max concurrent queries
73 // to ps.QueueSize. Without this, max
74 // concurrent queries would be bounded by
75 // client_count X client_queue_size.
79 for sink := range ps.sinks {
87 for pqEvent := range listener.Notify {
88 if pqEvent.Channel != "logs" {
93 LogUUID: pqEvent.Extra,
104 // NewSink subscribes to the event source. If c is not nil, it will be
105 // used as the event channel. Otherwise, a new channel will be
106 // created. Either way, the sink channel will be returned by the
107 // Channel() method of the returned eventSink. All subsequent events
108 // will be sent to the sink channel. The caller must ensure events are
109 // received from the sink channel as quickly as possible: when one
110 // sink blocks, all other sinks also block.
111 func (ps *pgEventSource) NewSink(c chan *event) eventSink {
112 ps.setupOnce.Do(ps.setup)
114 c = make(chan *event, 1)
116 sink := &pgEventSink{
121 ps.sinks[sink] = true
126 type pgEventSink struct {
128 source *pgEventSource
131 func (sink *pgEventSink) Channel() <-chan *event {
135 func (sink *pgEventSink) Stop() {
137 // Ensure this sink cannot fill up and block the
138 // server-side queue (which otherwise could in turn
139 // block our mtx.Lock() here)
140 for _ = range sink.channel {
143 sink.source.mtx.Lock()
144 delete(sink.source.sinks, sink)
145 sink.source.mtx.Unlock()