- ps.shutdown = make(chan error, 1)
- ps.sinks = make(map[*pgEventSink]bool)
+ ps.ready = make(chan bool)
+}
+
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+ ps.WaitReady()
+ ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
+ ps.setupOnce.Do(ps.setup)
+ <-ps.ready
+}
+
+// Run listens for event notifications on the "logs" channel and sends
+// them to all subscribers.
+func (ps *pgEventSource) Run() {
+ logger(nil).Debug("pgEventSource Run starting")
+ defer logger(nil).Debug("pgEventSource Run finished")
+
+ ps.setupOnce.Do(ps.setup)
+ ready := ps.ready
+ defer func() {
+ if ready != nil {
+ close(ready)
+ }
+ }()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ ps.cancel = cancel
+ defer cancel()
+
+ defer func() {
+ // Disconnect all clients
+ ps.mtx.Lock()
+ for sink := range ps.sinks {
+ close(sink.channel)
+ }
+ ps.sinks = nil
+ ps.mtx.Unlock()
+ }()