12 "git.curoverse.com/arvados.git/sdk/go/stats"
16 type pgConfig map[string]string
18 func (c pgConfig) ConnectionString() string {
24 strings.Replace(v, `\`, `\\`, -1),
31 type pgEventSource struct {
36 pqListener *pq.Listener
38 sinks map[*pgEventSink]bool
41 lastQDelay time.Duration
48 var _ debugStatuser = (*pgEventSource)(nil)
50 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
51 if et == pq.ListenerEventConnected {
52 logger(nil).Debug("pgEventSource connected")
56 // Until we have a mechanism for catching up on missed events,
57 // we cannot recover from a dropped connection without
58 // breaking our promises to clients.
60 WithField("eventType", et).
62 Error("listener problem")
66 // Run listens for event notifications on the "logs" channel and sends
67 // them to all subscribers.
68 func (ps *pgEventSource) Run() {
69 logger(nil).Debug("pgEventSource Run starting")
70 defer logger(nil).Debug("pgEventSource Run finished")
72 ctx, cancel := context.WithCancel(context.Background())
77 // Disconnect all clients
79 for sink := range ps.sinks {
86 db, err := sql.Open("postgres", ps.DataSource)
88 logger(nil).WithError(err).Fatal("sql.Open failed")
91 if err = db.Ping(); err != nil {
92 logger(nil).WithError(err).Fatal("db.Ping failed")
97 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
98 err = ps.pqListener.Listen("logs")
100 logger(nil).WithError(err).Fatal("pq Listen failed")
102 defer ps.pqListener.Close()
103 logger(nil).Debug("pq Listen setup done")
105 ps.queue = make(chan *event, ps.QueueSize)
106 defer close(ps.queue)
109 for e := range ps.queue {
110 // Wait for the "select ... from logs" call to
111 // finish. This limits max concurrent queries
112 // to ps.QueueSize. Without this, max
113 // concurrent queries would be bounded by
114 // client_count X client_queue_size.
118 WithField("serial", e.Serial).
119 WithField("detail", e.Detail()).
122 ps.lastQDelay = e.Ready.Sub(e.Received)
125 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
126 for sink := range ps.sinks {
134 ticker := time.NewTicker(time.Minute)
139 logger(nil).Debug("ctx done")
143 logger(nil).Debug("listener ping")
146 case pqEvent, ok := <-ps.pqListener.Notify:
148 logger(nil).Debug("pqListener Notify chan closed")
152 // pq should call listenerProblem
153 // itself in addition to sending us a
154 // nil event, so this might be
156 ps.listenerProblem(-1, nil)
159 if pqEvent.Channel != "logs" {
160 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
163 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
165 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
171 Received: time.Now(),
175 logger(nil).WithField("event", e).Debug("incoming")
176 atomic.AddUint64(&ps.eventsIn, 1)
183 // NewSink subscribes to the event source. NewSink returns an
184 // eventSink, whose Channel() method returns a channel: a pointer to
185 // each subsequent event will be sent to that channel.
187 // The caller must ensure events are received from the sink channel as
188 // quickly as possible because when one sink stops being ready, all
189 // other sinks block.
190 func (ps *pgEventSource) NewSink() eventSink {
191 sink := &pgEventSink{
192 channel: make(chan *event, 1),
197 ps.sinks = make(map[*pgEventSink]bool)
199 ps.sinks[sink] = true
204 func (ps *pgEventSource) DB() *sql.DB {
208 func (ps *pgEventSource) DebugStatus() interface{} {
210 defer ps.mtx.Unlock()
212 for sink := range ps.sinks {
213 blocked += len(sink.channel)
215 return map[string]interface{}{
216 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
217 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
218 "Queue": len(ps.queue),
219 "QueueLimit": cap(ps.queue),
220 "QueueDelay": stats.Duration(ps.lastQDelay),
221 "Sinks": len(ps.sinks),
222 "SinksBlocked": blocked,
226 type pgEventSink struct {
228 source *pgEventSource
231 func (sink *pgEventSink) Channel() <-chan *event {
235 // Stop sending events to the sink's channel.
236 func (sink *pgEventSink) Stop() {
238 // Ensure this sink cannot fill up and block the
239 // server-side queue (which otherwise could in turn
240 // block our mtx.Lock() here)
241 for _ = range sink.channel {
244 sink.source.mtx.Lock()
245 if _, ok := sink.source.sinks[sink]; ok {
246 delete(sink.source.sinks, sink)
249 sink.source.mtx.Unlock()