Merge branch '21461-excessive-scrollbars-fix'. Closes #21461
[arvados.git] / services / ws / event_source.go
index 341464de500cf784399f8df17b6d42acf4c4ebd2..e0269701c9715f5673650750dc7c00ba0b66159d 100644 (file)
@@ -11,19 +11,25 @@ import (
        "fmt"
        "strconv"
        "sync"
-       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/lib/pq"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
+var (
+       listenerPingInterval = time.Minute
+       testSlowPing         = false
+)
+
 type pgEventSource struct {
        DataSource   string
        MaxOpenConns int
        QueueSize    int
        Logger       logrus.FieldLogger
+       Reg          *prometheus.Registry
 
        db         *sql.DB
        pqListener *pq.Listener
@@ -32,8 +38,8 @@ type pgEventSource struct {
        mtx        sync.Mutex
 
        lastQDelay time.Duration
-       eventsIn   uint64
-       eventsOut  uint64
+       eventsIn   prometheus.Counter
+       eventsOut  prometheus.Counter
 
        cancel func()
 
@@ -41,8 +47,6 @@ type pgEventSource struct {
        ready     chan bool
 }
 
-var _ debugStatuser = (*pgEventSource)(nil)
-
 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
        if et == pq.ListenerEventConnected {
                ps.Logger.Debug("pgEventSource connected")
@@ -61,6 +65,95 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
 
 func (ps *pgEventSource) setup() {
        ps.ready = make(chan bool)
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_len",
+                       Help:      "Current number of events in queue",
+               }, func() float64 { return float64(len(ps.queue)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_cap",
+                       Help:      "Event queue capacity",
+               }, func() float64 { return float64(cap(ps.queue)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_delay",
+                       Help:      "Queue delay of the last emitted event",
+               }, func() float64 { return ps.lastQDelay.Seconds() }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "sinks",
+                       Help:      "Number of active sinks (connections)",
+               }, func() float64 { return float64(len(ps.sinks)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "sinks_blocked",
+                       Help:      "Number of sinks (connections) that are busy and blocking the main event stream",
+               }, func() float64 {
+                       ps.mtx.Lock()
+                       defer ps.mtx.Unlock()
+                       blocked := 0
+                       for sink := range ps.sinks {
+                               blocked += len(sink.channel)
+                       }
+                       return float64(blocked)
+               }))
+       ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "events_in",
+               Help:      "Number of events received from postgresql notify channel",
+       })
+       ps.Reg.MustRegister(ps.eventsIn)
+       ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "events_out",
+               Help:      "Number of events sent to client sessions (before filtering)",
+       })
+       ps.Reg.MustRegister(ps.eventsOut)
+
+       maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "db_max_connections",
+               Help:      "Maximum number of open connections to the database",
+       })
+       ps.Reg.MustRegister(maxConnections)
+       openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "db_open_connections",
+               Help:      "Open connections to the database",
+       }, []string{"inuse"})
+       ps.Reg.MustRegister(openConnections)
+
+       updateDBStats := func() {
+               stats := ps.db.Stats()
+               maxConnections.Set(float64(stats.MaxOpenConnections))
+               openConnections.WithLabelValues("0").Set(float64(stats.Idle))
+               openConnections.WithLabelValues("1").Set(float64(stats.InUse))
+       }
+       go func() {
+               <-ps.ready
+               if ps.db == nil {
+                       return
+               }
+               updateDBStats()
+               for range time.Tick(time.Second) {
+                       updateDBStats()
+               }
+       }()
 }
 
 // Close stops listening for new events and disconnects all clients.
@@ -109,7 +202,7 @@ func (ps *pgEventSource) Run() {
                return
        }
        if ps.MaxOpenConns <= 0 {
-               ps.Logger.Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
+               ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file")
        }
        db.SetMaxOpenConns(ps.MaxOpenConns)
        if err = db.Ping(); err != nil {
@@ -151,31 +244,45 @@ func (ps *pgEventSource) Run() {
                        ps.lastQDelay = e.Ready.Sub(e.Received)
 
                        ps.mtx.Lock()
-                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
                        for sink := range ps.sinks {
                                sink.channel <- e
+                               ps.eventsOut.Inc()
                        }
                        ps.mtx.Unlock()
                }
        }()
 
        var serial uint64
-       ticker := time.NewTicker(time.Minute)
-       defer ticker.Stop()
+
+       go func() {
+               ticker := time.NewTicker(listenerPingInterval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-ctx.Done():
+                               ps.Logger.Debug("ctx done")
+                               return
+
+                       case <-ticker.C:
+                               ps.Logger.Debug("listener ping")
+                               if testSlowPing {
+                                       time.Sleep(time.Second / 2)
+                               }
+                               err := ps.pqListener.Ping()
+                               if err != nil {
+                                       ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
+                                       continue
+                               }
+                       }
+               }
+       }()
+
        for {
                select {
                case <-ctx.Done():
                        ps.Logger.Debug("ctx done")
                        return
 
-               case <-ticker.C:
-                       ps.Logger.Debug("listener ping")
-                       err := ps.pqListener.Ping()
-                       if err != nil {
-                               ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
-                               continue
-                       }
-
                case pqEvent, ok := <-ps.pqListener.Notify:
                        if !ok {
                                ps.Logger.Error("pqListener Notify chan closed")
@@ -193,7 +300,7 @@ func (ps *pgEventSource) Run() {
                                ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
                                continue
                        }
-                       logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+                       logID, err := strconv.ParseInt(pqEvent.Extra, 10, 64)
                        if err != nil {
                                ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload")
                                continue
@@ -207,7 +314,7 @@ func (ps *pgEventSource) Run() {
                                logger:   ps.Logger,
                        }
                        ps.Logger.WithField("event", e).Debug("incoming")
-                       atomic.AddUint64(&ps.eventsIn, 1)
+                       ps.eventsIn.Inc()
                        ps.queue <- e
                        go e.Detail()
                }
@@ -258,8 +365,6 @@ func (ps *pgEventSource) DebugStatus() interface{} {
                blocked += len(sink.channel)
        }
        return map[string]interface{}{
-               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
-               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
                "Queue":        len(ps.queue),
                "QueueLimit":   cap(ps.queue),
                "QueueDelay":   stats.Duration(ps.lastQDelay),