1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/sdk/go/stats"
18 "github.com/prometheus/client_golang/prometheus"
19 "github.com/sirupsen/logrus"
22 type pgEventSource struct {
26 Logger logrus.FieldLogger
27 Reg *prometheus.Registry
30 pqListener *pq.Listener
32 sinks map[*pgEventSink]bool
35 lastQDelay time.Duration
36 eventsIn prometheus.Counter
37 eventsOut prometheus.Counter
45 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
46 if et == pq.ListenerEventConnected {
47 ps.Logger.Debug("pgEventSource connected")
51 // Until we have a mechanism for catching up on missed events,
52 // we cannot recover from a dropped connection without
53 // breaking our promises to clients.
55 WithField("eventType", et).
57 Error("listener problem")
61 func (ps *pgEventSource) setup() {
62 ps.ready = make(chan bool)
63 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
68 Help: "Current number of events in queue",
69 }, func() float64 { return float64(len(ps.queue)) }))
70 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
75 Help: "Event queue capacity",
76 }, func() float64 { return float64(cap(ps.queue)) }))
77 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
82 Help: "Queue delay of the last emitted event",
83 }, func() float64 { return ps.lastQDelay.Seconds() }))
84 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
89 Help: "Number of active sinks (connections)",
90 }, func() float64 { return float64(len(ps.sinks)) }))
91 ps.Reg.MustRegister(prometheus.NewGaugeFunc(
95 Name: "sinks_blocked",
96 Help: "Number of sinks (connections) that are busy and blocking the main event stream",
101 for sink := range ps.sinks {
102 blocked += len(sink.channel)
104 return float64(blocked)
106 ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
107 Namespace: "arvados",
110 Help: "Number of events received from postgresql notify channel",
112 ps.Reg.MustRegister(ps.eventsIn)
113 ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
114 Namespace: "arvados",
117 Help: "Number of events sent to client sessions (before filtering)",
119 ps.Reg.MustRegister(ps.eventsOut)
121 maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
122 Namespace: "arvados",
124 Name: "db_max_connections",
125 Help: "Maximum number of open connections to the database",
127 ps.Reg.MustRegister(maxConnections)
128 openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
129 Namespace: "arvados",
131 Name: "db_open_connections",
132 Help: "Open connections to the database",
133 }, []string{"inuse"})
134 ps.Reg.MustRegister(openConnections)
136 updateDBStats := func() {
137 stats := ps.db.Stats()
138 maxConnections.Set(float64(stats.MaxOpenConnections))
139 openConnections.WithLabelValues("0").Set(float64(stats.Idle))
140 openConnections.WithLabelValues("1").Set(float64(stats.InUse))
148 for range time.Tick(time.Second) {
154 // Close stops listening for new events and disconnects all clients.
155 func (ps *pgEventSource) Close() {
160 // WaitReady returns when the event listener is connected.
161 func (ps *pgEventSource) WaitReady() {
162 ps.setupOnce.Do(ps.setup)
166 // Run listens for event notifications on the "logs" channel and sends
167 // them to all subscribers.
168 func (ps *pgEventSource) Run() {
169 ps.Logger.Debug("pgEventSource Run starting")
170 defer ps.Logger.Debug("pgEventSource Run finished")
172 ps.setupOnce.Do(ps.setup)
180 ctx, cancel := context.WithCancel(context.Background())
185 // Disconnect all clients
187 for sink := range ps.sinks {
194 db, err := sql.Open("postgres", ps.DataSource)
196 ps.Logger.WithError(err).Error("sql.Open failed")
199 if ps.MaxOpenConns <= 0 {
200 ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
202 db.SetMaxOpenConns(ps.MaxOpenConns)
203 if err = db.Ping(); err != nil {
204 ps.Logger.WithError(err).Error("db.Ping failed")
209 ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
210 err = ps.pqListener.Listen("logs")
212 ps.Logger.WithError(err).Error("pq Listen failed")
215 defer ps.pqListener.Close()
216 ps.Logger.Debug("pq Listen setup done")
219 // Avoid double-close in deferred func
222 ps.queue = make(chan *event, ps.QueueSize)
223 defer close(ps.queue)
226 for e := range ps.queue {
227 // Wait for the "select ... from logs" call to
228 // finish. This limits max concurrent queries
229 // to ps.QueueSize. Without this, max
230 // concurrent queries would be bounded by
231 // client_count X client_queue_size.
235 WithField("serial", e.Serial).
236 WithField("detail", e.Detail()).
239 ps.lastQDelay = e.Ready.Sub(e.Received)
242 for sink := range ps.sinks {
251 ticker := time.NewTicker(time.Minute)
256 ps.Logger.Debug("ctx done")
260 ps.Logger.Debug("listener ping")
261 err := ps.pqListener.Ping()
263 ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
267 case pqEvent, ok := <-ps.pqListener.Notify:
269 ps.Logger.Error("pqListener Notify chan closed")
273 // pq should call listenerProblem
274 // itself in addition to sending us a
275 // nil event, so this might be
277 ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
280 if pqEvent.Channel != "logs" {
281 ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
284 logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
286 ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
292 Received: time.Now(),
297 ps.Logger.WithField("event", e).Debug("incoming")
305 // NewSink subscribes to the event source. NewSink returns an
306 // eventSink, whose Channel() method returns a channel: a pointer to
307 // each subsequent event will be sent to that channel.
309 // The caller must ensure events are received from the sink channel as
310 // quickly as possible because when one sink stops being ready, all
311 // other sinks block.
312 func (ps *pgEventSource) NewSink() eventSink {
313 sink := &pgEventSink{
314 channel: make(chan *event, 1),
319 ps.sinks = make(map[*pgEventSink]bool)
321 ps.sinks[sink] = true
326 func (ps *pgEventSource) DB() *sql.DB {
331 func (ps *pgEventSource) DBHealth() error {
333 return errors.New("database not connected")
335 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
338 return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
341 func (ps *pgEventSource) DebugStatus() interface{} {
343 defer ps.mtx.Unlock()
345 for sink := range ps.sinks {
346 blocked += len(sink.channel)
348 return map[string]interface{}{
349 "Queue": len(ps.queue),
350 "QueueLimit": cap(ps.queue),
351 "QueueDelay": stats.Duration(ps.lastQDelay),
352 "Sinks": len(ps.sinks),
353 "SinksBlocked": blocked,
354 "DBStats": ps.db.Stats(),
358 type pgEventSink struct {
360 source *pgEventSource
363 func (sink *pgEventSink) Channel() <-chan *event {
367 // Stop sending events to the sink's channel.
368 func (sink *pgEventSink) Stop() {
370 // Ensure this sink cannot fill up and block the
371 // server-side queue (which otherwise could in turn
372 // block our mtx.Lock() here)
373 for range sink.channel {
376 sink.source.mtx.Lock()
377 if _, ok := sink.source.sinks[sink]; ok {
378 delete(sink.source.sinks, sink)
381 sink.source.mtx.Unlock()