1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/stats"
21 type pgEventSource struct {
27 pqListener *pq.Listener
29 sinks map[*pgEventSink]bool
32 lastQDelay time.Duration
42 var _ debugStatuser = (*pgEventSource)(nil)
44 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
45 if et == pq.ListenerEventConnected {
46 logger(nil).Debug("pgEventSource connected")
50 // Until we have a mechanism for catching up on missed events,
51 // we cannot recover from a dropped connection without
52 // breaking our promises to clients.
54 WithField("eventType", et).
56 Error("listener problem")
60 func (ps *pgEventSource) setup() {
61 ps.ready = make(chan bool)
64 // Close stops listening for new events and disconnects all clients.
65 func (ps *pgEventSource) Close() {
70 // WaitReady returns when the event listener is connected.
71 func (ps *pgEventSource) WaitReady() {
72 ps.setupOnce.Do(ps.setup)
76 // Run listens for event notifications on the "logs" channel and sends
77 // them to all subscribers.
78 func (ps *pgEventSource) Run() {
79 logger(nil).Debug("pgEventSource Run starting")
80 defer logger(nil).Debug("pgEventSource Run finished")
82 ps.setupOnce.Do(ps.setup)
90 ctx, cancel := context.WithCancel(context.Background())
95 // Disconnect all clients
97 for sink := range ps.sinks {
104 db, err := sql.Open("postgres", ps.DataSource)
106 logger(nil).WithError(err).Error("sql.Open failed")
109 if ps.MaxOpenConns <= 0 {
110 logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
112 db.SetMaxOpenConns(ps.MaxOpenConns)
113 if err = db.Ping(); err != nil {
114 logger(nil).WithError(err).Error("db.Ping failed")
119 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
120 err = ps.pqListener.Listen("logs")
122 logger(nil).WithError(err).Error("pq Listen failed")
125 defer ps.pqListener.Close()
126 logger(nil).Debug("pq Listen setup done")
129 // Avoid double-close in deferred func
132 ps.queue = make(chan *event, ps.QueueSize)
133 defer close(ps.queue)
136 for e := range ps.queue {
137 // Wait for the "select ... from logs" call to
138 // finish. This limits max concurrent queries
139 // to ps.QueueSize. Without this, max
140 // concurrent queries would be bounded by
141 // client_count X client_queue_size.
145 WithField("serial", e.Serial).
146 WithField("detail", e.Detail()).
149 ps.lastQDelay = e.Ready.Sub(e.Received)
152 atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
153 for sink := range ps.sinks {
161 ticker := time.NewTicker(time.Minute)
166 logger(nil).Debug("ctx done")
170 logger(nil).Debug("listener ping")
171 err := ps.pqListener.Ping()
173 ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
177 case pqEvent, ok := <-ps.pqListener.Notify:
179 logger(nil).Error("pqListener Notify chan closed")
183 // pq should call listenerProblem
184 // itself in addition to sending us a
185 // nil event, so this might be
187 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
190 if pqEvent.Channel != "logs" {
191 logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
194 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
196 logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
202 Received: time.Now(),
206 logger(nil).WithField("event", e).Debug("incoming")
207 atomic.AddUint64(&ps.eventsIn, 1)
214 // NewSink subscribes to the event source. NewSink returns an
215 // eventSink, whose Channel() method returns a channel: a pointer to
216 // each subsequent event will be sent to that channel.
218 // The caller must ensure events are received from the sink channel as
219 // quickly as possible because when one sink stops being ready, all
220 // other sinks block.
221 func (ps *pgEventSource) NewSink() eventSink {
222 sink := &pgEventSink{
223 channel: make(chan *event, 1),
228 ps.sinks = make(map[*pgEventSink]bool)
230 ps.sinks[sink] = true
235 func (ps *pgEventSource) DB() *sql.DB {
240 func (ps *pgEventSource) DBHealth() error {
241 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
244 return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
247 func (ps *pgEventSource) DebugStatus() interface{} {
249 defer ps.mtx.Unlock()
251 for sink := range ps.sinks {
252 blocked += len(sink.channel)
254 return map[string]interface{}{
255 "EventsIn": atomic.LoadUint64(&ps.eventsIn),
256 "EventsOut": atomic.LoadUint64(&ps.eventsOut),
257 "Queue": len(ps.queue),
258 "QueueLimit": cap(ps.queue),
259 "QueueDelay": stats.Duration(ps.lastQDelay),
260 "Sinks": len(ps.sinks),
261 "SinksBlocked": blocked,
262 "DBStats": ps.db.Stats(),
266 type pgEventSink struct {
268 source *pgEventSource
271 func (sink *pgEventSink) Channel() <-chan *event {
275 // Stop sending events to the sink's channel.
276 func (sink *pgEventSink) Stop() {
278 // Ensure this sink cannot fill up and block the
279 // server-side queue (which otherwise could in turn
280 // block our mtx.Lock() here)
281 for range sink.channel {
284 sink.source.mtx.Lock()
285 if _, ok := sink.source.sinks[sink]; ok {
286 delete(sink.source.sinks, sink)
289 sink.source.mtx.Unlock()