11 "git.curoverse.com/arvados.git/sdk/go/stats"
15 type pgConfig map[string]string
17 func (c pgConfig) ConnectionString() string {
23 strings.Replace(v, `\`, `\\`, -1),
30 type pgEventSource struct {
35 pqListener *pq.Listener
37 sinks map[*pgEventSink]bool
42 lastQDelay time.Duration
47 var _ debugStatuser = (*pgEventSource)(nil)
49 func (ps *pgEventSource) setup() {
50 ps.shutdown = make(chan error, 1)
51 ps.sinks = make(map[*pgEventSink]bool)
53 db, err := sql.Open("postgres", ps.DataSource)
55 logger(nil).WithError(err).Fatal("sql.Open failed")
57 if err = db.Ping(); err != nil {
58 logger(nil).WithError(err).Fatal("db.Ping failed")
62 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
64 // Until we have a mechanism for catching up
65 // on missed events, we cannot recover from a
66 // dropped connection without breaking our
67 // promises to clients.
68 logger(nil).WithError(err).Error("listener problem")
72 err = ps.pqListener.Listen("logs")
74 logger(nil).WithError(err).Fatal("pq Listen failed")
76 logger(nil).Debug("pgEventSource listening")
81 func (ps *pgEventSource) run() {
82 ps.queue = make(chan *event, ps.QueueSize)
85 for e := range ps.queue {
86 // Wait for the "select ... from logs" call to
87 // finish. This limits max concurrent queries
88 // to ps.QueueSize. Without this, max
89 // concurrent queries would be bounded by
90 // client_count X client_queue_size.
94 WithField("serial", e.Serial).
95 WithField("detail", e.Detail()).
98 ps.lastQDelay = e.Ready.Sub(e.Received)
101 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
102 for sink := range ps.sinks {
110 ticker := time.NewTicker(time.Minute)
114 case err, ok := <-ps.shutdown:
116 logger(nil).WithError(err).Info("shutdown")
122 logger(nil).Debug("listener ping")
125 case pqEvent, ok := <-ps.pqListener.Notify:
130 if pqEvent.Channel != "logs" {
133 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
135 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
141 Received: time.Now(),
145 logger(nil).WithField("event", e).Debug("incoming")
146 atomic.AddUint64(&ps.eventsIn, 1)
153 // NewSink subscribes to the event source. NewSink returns an
154 // eventSink, whose Channel() method returns a channel: a pointer to
155 // each subsequent event will be sent to that channel.
157 // The caller must ensure events are received from the sink channel as
158 // quickly as possible because when one sink stops being ready, all
159 // other sinks block.
160 func (ps *pgEventSource) NewSink() eventSink {
161 ps.setupOnce.Do(ps.setup)
162 sink := &pgEventSink{
163 channel: make(chan *event, 1),
167 ps.sinks[sink] = true
172 func (ps *pgEventSource) DB() *sql.DB {
173 ps.setupOnce.Do(ps.setup)
177 func (ps *pgEventSource) DebugStatus() interface{} {
179 defer ps.mtx.Unlock()
181 for sink := range ps.sinks {
182 blocked += len(sink.channel)
184 return map[string]interface{}{
185 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
186 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
187 "Queue": len(ps.queue),
188 "QueueLimit": cap(ps.queue),
189 "QueueDelay": stats.Duration(ps.lastQDelay),
190 "Sinks": len(ps.sinks),
191 "SinksBlocked": blocked,
195 type pgEventSink struct {
197 source *pgEventSource
200 func (sink *pgEventSink) Channel() <-chan *event {
204 func (sink *pgEventSink) Stop() {
206 // Ensure this sink cannot fill up and block the
207 // server-side queue (which otherwise could in turn
208 // block our mtx.Lock() here)
209 for _ = range sink.channel {
212 sink.source.mtx.Lock()
213 delete(sink.source.sinks, sink)
214 sink.source.mtx.Unlock()