X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ba418300c50e1375ca9938562579b7bd6bf9490d..ccb5293993df4d535b3ca1e3224a5a146d8f90c2:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 341464de50..3593c3aebd 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -11,11 +11,11 @@ import ( "fmt" "strconv" "sync" - "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/stats" "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -24,6 +24,7 @@ type pgEventSource struct { MaxOpenConns int QueueSize int Logger logrus.FieldLogger + Reg *prometheus.Registry db *sql.DB pqListener *pq.Listener @@ -32,8 +33,8 @@ type pgEventSource struct { mtx sync.Mutex lastQDelay time.Duration - eventsIn uint64 - eventsOut uint64 + eventsIn prometheus.Counter + eventsOut prometheus.Counter cancel func() @@ -41,8 +42,6 @@ type pgEventSource struct { ready chan bool } -var _ debugStatuser = (*pgEventSource)(nil) - func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { if et == pq.ListenerEventConnected { ps.Logger.Debug("pgEventSource connected") @@ -61,6 +60,95 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { func (ps *pgEventSource) setup() { ps.ready = make(chan bool) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "queue_len", + Help: "Current number of events in queue", + }, func() float64 { return float64(len(ps.queue)) })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "queue_cap", + Help: "Event queue capacity", + }, func() float64 { return float64(cap(ps.queue)) })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "queue_delay", + Help: "Queue delay of the last emitted event", + }, func() float64 { return ps.lastQDelay.Seconds() })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "sinks", + Help: "Number of active sinks (connections)", + }, func() float64 { return float64(len(ps.sinks)) })) + ps.Reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "sinks_blocked", + Help: "Number of sinks (connections) that are busy and blocking the main event stream", + }, func() float64 { + ps.mtx.Lock() + defer ps.mtx.Unlock() + blocked := 0 + for sink := range ps.sinks { + blocked += len(sink.channel) + } + return float64(blocked) + })) + ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "events_in", + Help: "Number of events received from postgresql notify channel", + }) + ps.Reg.MustRegister(ps.eventsIn) + ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "events_out", + Help: "Number of events sent to client sessions (before filtering)", + }) + ps.Reg.MustRegister(ps.eventsOut) + + maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "db_max_connections", + Help: "Maximum number of open connections to the database", + }) + ps.Reg.MustRegister(maxConnections) + openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "ws", + Name: "db_open_connections", + Help: "Open connections to the database", + }, []string{"inuse"}) + ps.Reg.MustRegister(openConnections) + + updateDBStats := func() { + stats := ps.db.Stats() + maxConnections.Set(float64(stats.MaxOpenConnections)) + openConnections.WithLabelValues("0").Set(float64(stats.Idle)) + openConnections.WithLabelValues("1").Set(float64(stats.InUse)) + } + go func() { + <-ps.ready + if ps.db == nil { + return + } + updateDBStats() + for range time.Tick(time.Second) { + updateDBStats() + } + }() } // Close stops listening for new events and disconnects all clients. @@ -109,7 +197,7 @@ func (ps *pgEventSource) Run() { return } if ps.MaxOpenConns <= 0 { - ps.Logger.Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file") + ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file") } db.SetMaxOpenConns(ps.MaxOpenConns) if err = db.Ping(); err != nil { @@ -151,9 +239,9 @@ func (ps *pgEventSource) Run() { ps.lastQDelay = e.Ready.Sub(e.Received) ps.mtx.Lock() - atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks))) for sink := range ps.sinks { sink.channel <- e + ps.eventsOut.Inc() } ps.mtx.Unlock() } @@ -207,7 +295,7 @@ func (ps *pgEventSource) Run() { logger: ps.Logger, } ps.Logger.WithField("event", e).Debug("incoming") - atomic.AddUint64(&ps.eventsIn, 1) + ps.eventsIn.Inc() ps.queue <- e go e.Detail() } @@ -258,8 +346,6 @@ func (ps *pgEventSource) DebugStatus() interface{} { blocked += len(sink.channel) } return map[string]interface{}{ - "EventsIn": atomic.LoadUint64(&ps.eventsIn), - "EventsOut": atomic.LoadUint64(&ps.eventsOut), "Queue": len(ps.queue), "QueueLimit": cap(ps.queue), "QueueDelay": stats.Duration(ps.lastQDelay),