1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.curoverse.com/arvados.git/sdk/go/stats"
19 type pgEventSource struct {
25 pqListener *pq.Listener
27 sinks map[*pgEventSink]bool
30 lastQDelay time.Duration
40 var _ debugStatuser = (*pgEventSource)(nil)
42 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
43 if et == pq.ListenerEventConnected {
44 logger(nil).Debug("pgEventSource connected")
48 // Until we have a mechanism for catching up on missed events,
49 // we cannot recover from a dropped connection without
50 // breaking our promises to clients.
52 WithField("eventType", et).
54 Error("listener problem")
58 func (ps *pgEventSource) setup() {
59 ps.ready = make(chan bool)
62 // Close stops listening for new events and disconnects all clients.
63 func (ps *pgEventSource) Close() {
68 // WaitReady returns when the event listener is connected.
69 func (ps *pgEventSource) WaitReady() {
70 ps.setupOnce.Do(ps.setup)
74 // Run listens for event notifications on the "logs" channel and sends
75 // them to all subscribers.
76 func (ps *pgEventSource) Run() {
77 logger(nil).Debug("pgEventSource Run starting")
78 defer logger(nil).Debug("pgEventSource Run finished")
80 ps.setupOnce.Do(ps.setup)
88 ctx, cancel := context.WithCancel(context.Background())
93 // Disconnect all clients
95 for sink := range ps.sinks {
102 db, err := sql.Open("postgres", ps.DataSource)
104 logger(nil).WithError(err).Error("sql.Open failed")
107 if ps.MaxOpenConns <= 0 {
108 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
110 db.SetMaxOpenConns(ps.MaxOpenConns)
111 if err = db.Ping(); err != nil {
112 logger(nil).WithError(err).Error("db.Ping failed")
117 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
118 err = ps.pqListener.Listen("logs")
120 logger(nil).WithError(err).Error("pq Listen failed")
123 defer ps.pqListener.Close()
124 logger(nil).Debug("pq Listen setup done")
127 // Avoid double-close in deferred func
130 ps.queue = make(chan *event, ps.QueueSize)
131 defer close(ps.queue)
134 for e := range ps.queue {
135 // Wait for the "select ... from logs" call to
136 // finish. This limits max concurrent queries
137 // to ps.QueueSize. Without this, max
138 // concurrent queries would be bounded by
139 // client_count X client_queue_size.
143 WithField("serial", e.Serial).
144 WithField("detail", e.Detail()).
147 ps.lastQDelay = e.Ready.Sub(e.Received)
150 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
151 for sink := range ps.sinks {
159 ticker := time.NewTicker(time.Minute)
164 logger(nil).Debug("ctx done")
168 logger(nil).Debug("listener ping")
171 case pqEvent, ok := <-ps.pqListener.Notify:
173 logger(nil).Debug("pqListener Notify chan closed")
177 // pq should call listenerProblem
178 // itself in addition to sending us a
179 // nil event, so this might be
181 ps.listenerProblem(-1, nil)
184 if pqEvent.Channel != "logs" {
185 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
188 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
190 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
196 Received: time.Now(),
200 logger(nil).WithField("event", e).Debug("incoming")
201 atomic.AddUint64(&ps.eventsIn, 1)
208 // NewSink subscribes to the event source. NewSink returns an
209 // eventSink, whose Channel() method returns a channel: a pointer to
210 // each subsequent event will be sent to that channel.
212 // The caller must ensure events are received from the sink channel as
213 // quickly as possible because when one sink stops being ready, all
214 // other sinks block.
215 func (ps *pgEventSource) NewSink() eventSink {
216 sink := &pgEventSink{
217 channel: make(chan *event, 1),
222 ps.sinks = make(map[*pgEventSink]bool)
224 ps.sinks[sink] = true
229 func (ps *pgEventSource) DB() *sql.DB {
234 func (ps *pgEventSource) DBHealth() error {
235 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
238 return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
241 func (ps *pgEventSource) DebugStatus() interface{} {
243 defer ps.mtx.Unlock()
245 for sink := range ps.sinks {
246 blocked += len(sink.channel)
248 return map[string]interface{}{
249 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
250 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
251 "Queue": len(ps.queue),
252 "QueueLimit": cap(ps.queue),
253 "QueueDelay": stats.Duration(ps.lastQDelay),
254 "Sinks": len(ps.sinks),
255 "SinksBlocked": blocked,
256 "DBStats": ps.db.Stats(),
260 type pgEventSink struct {
262 source *pgEventSource
265 func (sink *pgEventSink) Channel() <-chan *event {
269 // Stop sending events to the sink's channel.
270 func (sink *pgEventSink) Stop() {
272 // Ensure this sink cannot fill up and block the
273 // server-side queue (which otherwise could in turn
274 // block our mtx.Lock() here)
275 for range sink.channel {
278 sink.source.mtx.Lock()
279 if _, ok := sink.source.sinks[sink]; ok {
280 delete(sink.source.sinks, sink)
283 sink.source.mtx.Unlock()