1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/sdk/go/stats"
18 "github.com/prometheus/client_golang/prometheus"
19 "github.com/sirupsen/logrus"
23 listenerPingInterval = time.Minute
27 type pgEventSource struct {
31 Logger logrus.FieldLogger
32 Reg *prometheus.Registry
35 pqListener *pq.Listener
37 sinks map[*pgEventSink]bool
40 lastQDelay time.Duration
41 eventsIn prometheus.Counter
42 eventsOut prometheus.Counter
50 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
51 if et == pq.ListenerEventConnected {
52 ps.Logger.Debug("pgEventSource connected")
56 // Until we have a mechanism for catching up on missed events,
57 // we cannot recover from a dropped connection without
58 // breaking our promises to clients.
60 WithField("eventType", et).
62 Error("listener problem")
66 func (ps *pgEventSource) setup() {
67 ps.ready = make(chan bool)
68 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
73 Help: "Current number of events in queue",
74 }, func() float64 { return float64(len(ps.queue)) }))
75 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
80 Help: "Event queue capacity",
81 }, func() float64 { return float64(cap(ps.queue)) }))
82 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
87 Help: "Queue delay of the last emitted event",
88 }, func() float64 { return ps.lastQDelay.Seconds() }))
89 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
94 Help: "Number of active sinks (connections)",
95 }, func() float64 { return float64(len(ps.sinks)) }))
96 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
100 Name: "sinks_blocked",
101 Help: "Number of sinks (connections) that are busy and blocking the main event stream",
104 defer ps.mtx.Unlock()
106 for sink := range ps.sinks {
107 blocked += len(sink.channel)
109 return float64(blocked)
111 ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
112 Namespace: "arvados",
115 Help: "Number of events received from postgresql notify channel",
117 ps.Reg.MustRegister(ps.eventsIn)
118 ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
119 Namespace: "arvados",
122 Help: "Number of events sent to client sessions (before filtering)",
124 ps.Reg.MustRegister(ps.eventsOut)
126 maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
127 Namespace: "arvados",
129 Name: "db_max_connections",
130 Help: "Maximum number of open connections to the database",
132 ps.Reg.MustRegister(maxConnections)
133 openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
134 Namespace: "arvados",
136 Name: "db_open_connections",
137 Help: "Open connections to the database",
138 }, []string{"inuse"})
139 ps.Reg.MustRegister(openConnections)
141 updateDBStats := func() {
142 stats := ps.db.Stats()
143 maxConnections.Set(float64(stats.MaxOpenConnections))
144 openConnections.WithLabelValues("0").Set(float64(stats.Idle))
145 openConnections.WithLabelValues("1").Set(float64(stats.InUse))
153 for range time.Tick(time.Second) {
159 // Close stops listening for new events and disconnects all clients.
160 func (ps *pgEventSource) Close() {
165 // WaitReady returns when the event listener is connected.
166 func (ps *pgEventSource) WaitReady() {
167 ps.setupOnce.Do(ps.setup)
171 // Run listens for event notifications on the "logs" channel and sends
172 // them to all subscribers.
173 func (ps *pgEventSource) Run() {
174 ps.Logger.Debug("pgEventSource Run starting")
175 defer ps.Logger.Debug("pgEventSource Run finished")
177 ps.setupOnce.Do(ps.setup)
185 ctx, cancel := context.WithCancel(context.Background())
190 // Disconnect all clients
192 for sink := range ps.sinks {
199 db, err := sql.Open("postgres", ps.DataSource)
201 ps.Logger.WithError(err).Error("sql.Open failed")
204 if ps.MaxOpenConns <= 0 {
205 ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
207 db.SetMaxOpenConns(ps.MaxOpenConns)
208 if err = db.Ping(); err != nil {
209 ps.Logger.WithError(err).Error("db.Ping failed")
214 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
215 err = ps.pqListener.Listen("logs")
217 ps.Logger.WithError(err).Error("pq Listen failed")
220 defer ps.pqListener.Close()
221 ps.Logger.Debug("pq Listen setup done")
224 // Avoid double-close in deferred func
227 ps.queue = make(chan *event, ps.QueueSize)
228 defer close(ps.queue)
231 for e := range ps.queue {
232 // Wait for the "select ... from logs" call to
233 // finish. This limits max concurrent queries
234 // to ps.QueueSize. Without this, max
235 // concurrent queries would be bounded by
236 // client_count X client_queue_size.
240 WithField("serial", e.Serial).
241 WithField("detail", e.Detail()).
244 ps.lastQDelay = e.Ready.Sub(e.Received)
247 for sink := range ps.sinks {
258 ticker := time.NewTicker(listenerPingInterval)
263 ps.Logger.Debug("ctx done")
267 ps.Logger.Debug("listener ping")
269 time.Sleep(time.Second / 2)
271 err := ps.pqListener.Ping()
273 ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
283 ps.Logger.Debug("ctx done")
286 case pqEvent, ok := <-ps.pqListener.Notify:
288 ps.Logger.Error("pqListener Notify chan closed")
292 // pq should call listenerProblem
293 // itself in addition to sending us a
294 // nil event, so this might be
296 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
299 if pqEvent.Channel != "logs" {
300 ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
303 logID, err := strconv.ParseInt(pqEvent.Extra, 10, 64)
305 ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
311 Received: time.Now(),
316 ps.Logger.WithField("event", e).Debug("incoming")
324 // NewSink subscribes to the event source. NewSink returns an
325 // eventSink, whose Channel() method returns a channel: a pointer to
326 // each subsequent event will be sent to that channel.
328 // The caller must ensure events are received from the sink channel as
329 // quickly as possible because when one sink stops being ready, all
330 // other sinks block.
331 func (ps *pgEventSource) NewSink() eventSink {
332 sink := &pgEventSink{
333 channel: make(chan *event, 1),
338 ps.sinks = make(map[*pgEventSink]bool)
340 ps.sinks[sink] = true
345 func (ps *pgEventSource) DB() *sql.DB {
350 func (ps *pgEventSource) DBHealth() error {
352 return errors.New("database not connected")
354 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
357 return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
360 func (ps *pgEventSource) DebugStatus() interface{} {
362 defer ps.mtx.Unlock()
364 for sink := range ps.sinks {
365 blocked += len(sink.channel)
367 return map[string]interface{}{
368 "Queue": len(ps.queue),
369 "QueueLimit": cap(ps.queue),
370 "QueueDelay": stats.Duration(ps.lastQDelay),
371 "Sinks": len(ps.sinks),
372 "SinksBlocked": blocked,
373 "DBStats": ps.db.Stats(),
377 type pgEventSink struct {
379 source *pgEventSource
382 func (sink *pgEventSink) Channel() <-chan *event {
386 // Stop sending events to the sink's channel.
387 func (sink *pgEventSink) Stop() {
389 // Ensure this sink cannot fill up and block the
390 // server-side queue (which otherwise could in turn
391 // block our mtx.Lock() here)
392 for range sink.channel {
395 sink.source.mtx.Lock()
396 if _, ok := sink.source.sinks[sink]; ok {
397 delete(sink.source.sinks, sink)
400 sink.source.mtx.Unlock()