12 "git.curoverse.com/arvados.git/sdk/go/stats"
16 type pgConfig map[string]string
18 func (c pgConfig) ConnectionString() string {
24 strings.Replace(v, `\`, `\\`, -1),
31 type pgEventSource struct {
36 pqListener *pq.Listener
38 sinks map[*pgEventSink]bool
41 lastQDelay time.Duration
51 var _ debugStatuser = (*pgEventSource)(nil)
53 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
54 if et == pq.ListenerEventConnected {
55 logger(nil).Debug("pgEventSource connected")
59 // Until we have a mechanism for catching up on missed events,
60 // we cannot recover from a dropped connection without
61 // breaking our promises to clients.
63 WithField("eventType", et).
65 Error("listener problem")
69 func (ps *pgEventSource) setup() {
70 ps.ready = make(chan bool)
73 // Close stops listening for new events and disconnects all clients.
74 func (ps *pgEventSource) Close() {
79 // WaitReady returns when the event listener is connected.
80 func (ps *pgEventSource) WaitReady() {
81 ps.setupOnce.Do(ps.setup)
85 // Run listens for event notifications on the "logs" channel and sends
86 // them to all subscribers.
87 func (ps *pgEventSource) Run() {
88 logger(nil).Debug("pgEventSource Run starting")
89 defer logger(nil).Debug("pgEventSource Run finished")
91 ps.setupOnce.Do(ps.setup)
99 ctx, cancel := context.WithCancel(context.Background())
104 // Disconnect all clients
106 for sink := range ps.sinks {
113 db, err := sql.Open("postgres", ps.DataSource)
115 logger(nil).WithError(err).Error("sql.Open failed")
118 if err = db.Ping(); err != nil {
119 logger(nil).WithError(err).Error("db.Ping failed")
124 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
125 err = ps.pqListener.Listen("logs")
127 logger(nil).WithError(err).Error("pq Listen failed")
130 defer ps.pqListener.Close()
131 logger(nil).Debug("pq Listen setup done")
134 // Avoid double-close in deferred func
137 ps.queue = make(chan *event, ps.QueueSize)
138 defer close(ps.queue)
141 for e := range ps.queue {
142 // Wait for the "select ... from logs" call to
143 // finish. This limits max concurrent queries
144 // to ps.QueueSize. Without this, max
145 // concurrent queries would be bounded by
146 // client_count X client_queue_size.
150 WithField("serial", e.Serial).
151 WithField("detail", e.Detail()).
154 ps.lastQDelay = e.Ready.Sub(e.Received)
157 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
158 for sink := range ps.sinks {
166 ticker := time.NewTicker(time.Minute)
171 logger(nil).Debug("ctx done")
175 logger(nil).Debug("listener ping")
178 case pqEvent, ok := <-ps.pqListener.Notify:
180 logger(nil).Debug("pqListener Notify chan closed")
184 // pq should call listenerProblem
185 // itself in addition to sending us a
186 // nil event, so this might be
188 ps.listenerProblem(-1, nil)
191 if pqEvent.Channel != "logs" {
192 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
195 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
197 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
203 Received: time.Now(),
207 logger(nil).WithField("event", e).Debug("incoming")
208 atomic.AddUint64(&ps.eventsIn, 1)
215 // NewSink subscribes to the event source. NewSink returns an
216 // eventSink, whose Channel() method returns a channel: a pointer to
217 // each subsequent event will be sent to that channel.
219 // The caller must ensure events are received from the sink channel as
220 // quickly as possible because when one sink stops being ready, all
221 // other sinks block.
222 func (ps *pgEventSource) NewSink() eventSink {
223 sink := &pgEventSink{
224 channel: make(chan *event, 1),
229 ps.sinks = make(map[*pgEventSink]bool)
231 ps.sinks[sink] = true
236 func (ps *pgEventSource) DB() *sql.DB {
240 func (ps *pgEventSource) DebugStatus() interface{} {
242 defer ps.mtx.Unlock()
244 for sink := range ps.sinks {
245 blocked += len(sink.channel)
247 return map[string]interface{}{
248 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
249 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
250 "Queue": len(ps.queue),
251 "QueueLimit": cap(ps.queue),
252 "QueueDelay": stats.Duration(ps.lastQDelay),
253 "Sinks": len(ps.sinks),
254 "SinksBlocked": blocked,
258 type pgEventSink struct {
260 source *pgEventSource
263 func (sink *pgEventSink) Channel() <-chan *event {
267 // Stop sending events to the sink's channel.
268 func (sink *pgEventSink) Stop() {
270 // Ensure this sink cannot fill up and block the
271 // server-side queue (which otherwise could in turn
272 // block our mtx.Lock() here)
273 for _ = range sink.channel {
276 sink.source.mtx.Lock()
277 if _, ok := sink.source.sinks[sink]; ok {
278 delete(sink.source.sinks, sink)
281 sink.source.mtx.Unlock()