X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cec8e2705d260c9df1042858941419a3b9160c0e..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 309dab7a40..3593c3aebd 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -2,24 +2,29 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package ws import ( "context" "database/sql" + "errors" + "fmt" "strconv" "sync" - "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/stats" + "git.arvados.org/arvados.git/sdk/go/stats" "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) type pgEventSource struct { DataSource string MaxOpenConns int QueueSize int + Logger logrus.FieldLogger + Reg *prometheus.Registry db *sql.DB pqListener *pq.Listener @@ -28,8 +33,8 @@ type pgEventSource struct { mtx sync.Mutex lastQDelay time.Duration - eventsIn uint64 - eventsOut uint64 + eventsIn prometheus.Counter + eventsOut prometheus.Counter cancel func() @@ -37,18 +42,16 @@ type pgEventSource struct { ready chan bool } -var _ debugStatuser = (*pgEventSource)(nil) - func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { if et == pq.ListenerEventConnected { - logger(nil).Debug("pgEventSource connected") + ps.Logger.Debug("pgEventSource connected") return } // Until we have a mechanism for catching up on missed events, // we cannot recover from a dropped connection without // breaking our promises to clients. - logger(nil). + ps.Logger. WithField("eventType", et). WithError(err). Error("listener problem") @@ -57,6 +60,95 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { func (ps *pgEventSource) setup() { ps.ready = make(chan bool) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "queue_len", + Help: "Current number of events in queue", + }, func() float64 { return float64(len(ps.queue)) })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "queue_cap", + Help: "Event queue capacity", + }, func() float64 { return float64(cap(ps.queue)) })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "queue_delay", + Help: "Queue delay of the last emitted event", + }, func() float64 { return ps.lastQDelay.Seconds() })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "sinks", + Help: "Number of active sinks (connections)", + }, func() float64 { return float64(len(ps.sinks)) })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "sinks_blocked", + Help: "Number of sinks (connections) that are busy and blocking the main event stream", + }, func() float64 { + ps.mtx.Lock() + defer ps.mtx.Unlock() + blocked := 0 + for sink := range ps.sinks { + blocked += len(sink.channel) + } + return float64(blocked) + })) + ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "events_in", + Help: "Number of events received from postgresql notify channel", + }) + ps.Reg.MustRegister(ps.eventsIn) + ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "events_out", + Help: "Number of events sent to client sessions (before filtering)", + }) + ps.Reg.MustRegister(ps.eventsOut) + + maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "db_max_connections", + Help: "Maximum number of open connections to the database", + }) + ps.Reg.MustRegister(maxConnections) + openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "db_open_connections", + Help: "Open connections to the database", + }, []string{"inuse"}) + ps.Reg.MustRegister(openConnections) + + updateDBStats := func() { + stats := ps.db.Stats() + maxConnections.Set(float64(stats.MaxOpenConnections)) + openConnections.WithLabelValues("0").Set(float64(stats.Idle)) + openConnections.WithLabelValues("1").Set(float64(stats.InUse)) + } + go func() { + <-ps.ready + if ps.db == nil { + return + } + updateDBStats() + for range time.Tick(time.Second) { + updateDBStats() + } + }() } // Close stops listening for new events and disconnects all clients. @@ -74,8 +166,8 @@ func (ps *pgEventSource) WaitReady() { // Run listens for event notifications on the "logs" channel and sends // them to all subscribers. func (ps *pgEventSource) Run() { - logger(nil).Debug("pgEventSource Run starting") - defer logger(nil).Debug("pgEventSource Run finished") + ps.Logger.Debug("pgEventSource Run starting") + defer ps.Logger.Debug("pgEventSource Run finished") ps.setupOnce.Do(ps.setup) ready := ps.ready @@ -101,15 +193,15 @@ func (ps *pgEventSource) Run() { db, err := sql.Open("postgres", ps.DataSource) if err != nil { - logger(nil).WithError(err).Error("sql.Open failed") + ps.Logger.WithError(err).Error("sql.Open failed") return } if ps.MaxOpenConns <= 0 { - logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file") + ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file") } db.SetMaxOpenConns(ps.MaxOpenConns) if err = db.Ping(); err != nil { - logger(nil).WithError(err).Error("db.Ping failed") + ps.Logger.WithError(err).Error("db.Ping failed") return } ps.db = db @@ -117,11 +209,11 @@ func (ps *pgEventSource) Run() { ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem) err = ps.pqListener.Listen("logs") if err != nil { - logger(nil).WithError(err).Error("pq Listen failed") + ps.Logger.WithError(err).Error("pq Listen failed") return } defer ps.pqListener.Close() - logger(nil).Debug("pq Listen setup done") + ps.Logger.Debug("pq Listen setup done") close(ready) // Avoid double-close in deferred func @@ -139,7 +231,7 @@ func (ps *pgEventSource) Run() { // client_count X client_queue_size. e.Detail() - logger(nil). + ps.Logger. WithField("serial", e.Serial). WithField("detail", e.Detail()). Debug("event ready") @@ -147,9 +239,9 @@ func (ps *pgEventSource) Run() { ps.lastQDelay = e.Ready.Sub(e.Received) ps.mtx.Lock() - atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks))) for sink := range ps.sinks { sink.channel <- e + ps.eventsOut.Inc() } ps.mtx.Unlock() } @@ -161,16 +253,20 @@ func (ps *pgEventSource) Run() { for { select { case <-ctx.Done(): - logger(nil).Debug("ctx done") + ps.Logger.Debug("ctx done") return case <-ticker.C: - logger(nil).Debug("listener ping") - ps.pqListener.Ping() + ps.Logger.Debug("listener ping") + err := ps.pqListener.Ping() + if err != nil { + ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err)) + continue + } case pqEvent, ok := <-ps.pqListener.Notify: if !ok { - logger(nil).Debug("pqListener Notify chan closed") + ps.Logger.Error("pqListener Notify chan closed") return } if pqEvent == nil { @@ -178,16 +274,16 @@ func (ps *pgEventSource) Run() { // itself in addition to sending us a // nil event, so this might be // superfluous: - ps.listenerProblem(-1, nil) + ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event")) continue } if pqEvent.Channel != "logs" { - logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel") + ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel") continue } logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) if err != nil { - logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload") + ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload") continue } serial++ @@ -196,9 +292,10 @@ func (ps *pgEventSource) Run() { Received: time.Now(), Serial: serial, db: ps.db, + logger: ps.Logger, } - logger(nil).WithField("event", e).Debug("incoming") - atomic.AddUint64(&ps.eventsIn, 1) + ps.Logger.WithField("event", e).Debug("incoming") + ps.eventsIn.Inc() ps.queue <- e go e.Detail() } @@ -232,6 +329,9 @@ func (ps *pgEventSource) DB() *sql.DB { } func (ps *pgEventSource) DBHealth() error { + if ps.db == nil { + return errors.New("database not connected") + } ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) defer cancel() var i int @@ -246,8 +346,6 @@ func (ps *pgEventSource) DebugStatus() interface{} { blocked += len(sink.channel) } return map[string]interface{}{ - "EventsIn": atomic.LoadUint64(&ps.eventsIn), - "EventsOut": atomic.LoadUint64(&ps.eventsOut), "Queue": len(ps.queue), "QueueLimit": cap(ps.queue), "QueueDelay": stats.Duration(ps.lastQDelay),