12 "git.curoverse.com/arvados.git/sdk/go/stats"
16 type pgConfig map[string]string
18 func (c pgConfig) ConnectionString() string {
24 strings.Replace(v, `\`, `\\`, -1),
31 type pgEventSource struct {
36 pqListener *pq.Listener
38 sinks map[*pgEventSink]bool
41 lastQDelay time.Duration
51 var _ debugStatuser = (*pgEventSource)(nil)
53 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
54 if et == pq.ListenerEventConnected {
55 logger(nil).Debug("pgEventSource connected")
59 // Until we have a mechanism for catching up on missed events,
60 // we cannot recover from a dropped connection without
61 // breaking our promises to clients.
63 WithField("eventType", et).
65 Error("listener problem")
69 func (ps *pgEventSource) setup() {
70 ps.ready = make(chan bool)
73 // waitReady returns when private fields (cancel, db) are available
75 func (ps *pgEventSource) waitReady() {
76 ps.setupOnce.Do(ps.setup)
80 // Run listens for event notifications on the "logs" channel and sends
81 // them to all subscribers.
82 func (ps *pgEventSource) Run() {
83 logger(nil).Debug("pgEventSource Run starting")
84 defer logger(nil).Debug("pgEventSource Run finished")
86 ps.setupOnce.Do(ps.setup)
88 ctx, cancel := context.WithCancel(context.Background())
93 // Disconnect all clients
95 for sink := range ps.sinks {
102 db, err := sql.Open("postgres", ps.DataSource)
104 logger(nil).WithError(err).Fatal("sql.Open failed")
107 if err = db.Ping(); err != nil {
108 logger(nil).WithError(err).Fatal("db.Ping failed")
113 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
114 err = ps.pqListener.Listen("logs")
116 logger(nil).WithError(err).Fatal("pq Listen failed")
118 defer ps.pqListener.Close()
119 logger(nil).Debug("pq Listen setup done")
123 ps.queue = make(chan *event, ps.QueueSize)
124 defer close(ps.queue)
127 for e := range ps.queue {
128 // Wait for the "select ... from logs" call to
129 // finish. This limits max concurrent queries
130 // to ps.QueueSize. Without this, max
131 // concurrent queries would be bounded by
132 // client_count X client_queue_size.
136 WithField("serial", e.Serial).
137 WithField("detail", e.Detail()).
140 ps.lastQDelay = e.Ready.Sub(e.Received)
143 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
144 for sink := range ps.sinks {
152 ticker := time.NewTicker(time.Minute)
157 logger(nil).Debug("ctx done")
161 logger(nil).Debug("listener ping")
164 case pqEvent, ok := <-ps.pqListener.Notify:
166 logger(nil).Debug("pqListener Notify chan closed")
170 // pq should call listenerProblem
171 // itself in addition to sending us a
172 // nil event, so this might be
174 ps.listenerProblem(-1, nil)
177 if pqEvent.Channel != "logs" {
178 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
181 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
183 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
189 Received: time.Now(),
193 logger(nil).WithField("event", e).Debug("incoming")
194 atomic.AddUint64(&ps.eventsIn, 1)
201 // NewSink subscribes to the event source. NewSink returns an
202 // eventSink, whose Channel() method returns a channel: a pointer to
203 // each subsequent event will be sent to that channel.
205 // The caller must ensure events are received from the sink channel as
206 // quickly as possible because when one sink stops being ready, all
207 // other sinks block.
208 func (ps *pgEventSource) NewSink() eventSink {
209 sink := &pgEventSink{
210 channel: make(chan *event, 1),
215 ps.sinks = make(map[*pgEventSink]bool)
217 ps.sinks[sink] = true
222 func (ps *pgEventSource) DB() *sql.DB {
226 func (ps *pgEventSource) DebugStatus() interface{} {
228 defer ps.mtx.Unlock()
230 for sink := range ps.sinks {
231 blocked += len(sink.channel)
233 return map[string]interface{}{
234 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
235 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
236 "Queue": len(ps.queue),
237 "QueueLimit": cap(ps.queue),
238 "QueueDelay": stats.Duration(ps.lastQDelay),
239 "Sinks": len(ps.sinks),
240 "SinksBlocked": blocked,
244 type pgEventSink struct {
246 source *pgEventSource
249 func (sink *pgEventSink) Channel() <-chan *event {
253 // Stop sending events to the sink's channel.
254 func (sink *pgEventSink) Stop() {
256 // Ensure this sink cannot fill up and block the
257 // server-side queue (which otherwise could in turn
258 // block our mtx.Lock() here)
259 for _ = range sink.channel {
262 sink.source.mtx.Lock()
263 if _, ok := sink.source.sinks[sink]; ok {
264 delete(sink.source.sinks, sink)
267 sink.source.mtx.Unlock()