16217: Add prometheus metrics.
authorTom Clegg <tom@tomclegg.ca>
Wed, 25 Mar 2020 20:23:41 +0000 (16:23 -0400)
committerTom Clegg <tom@tomclegg.ca>
Wed, 25 Mar 2020 20:23:41 +0000 (16:23 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

services/ws/event_source.go
services/ws/event_source_test.go
services/ws/router.go
services/ws/service.go

index 341464de500cf784399f8df17b6d42acf4c4ebd2..60d4d40aca67ee3b504a8dba6c71def29caa1ba5 100644 (file)
@@ -11,11 +11,11 @@ import (
        "fmt"
        "strconv"
        "sync"
-       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/lib/pq"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
@@ -24,6 +24,7 @@ type pgEventSource struct {
        MaxOpenConns int
        QueueSize    int
        Logger       logrus.FieldLogger
+       Reg          *prometheus.Registry
 
        db         *sql.DB
        pqListener *pq.Listener
@@ -32,8 +33,8 @@ type pgEventSource struct {
        mtx        sync.Mutex
 
        lastQDelay time.Duration
-       eventsIn   uint64
-       eventsOut  uint64
+       eventsIn   prometheus.Counter
+       eventsOut  prometheus.Counter
 
        cancel func()
 
@@ -41,8 +42,6 @@ type pgEventSource struct {
        ready     chan bool
 }
 
-var _ debugStatuser = (*pgEventSource)(nil)
-
 func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
        if et == pq.ListenerEventConnected {
                ps.Logger.Debug("pgEventSource connected")
@@ -61,6 +60,90 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
 
 func (ps *pgEventSource) setup() {
        ps.ready = make(chan bool)
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_len",
+                       Help:      "Current number of events in queue",
+               }, func() float64 { return float64(len(ps.queue)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_cap",
+                       Help:      "Event queue capacity",
+               }, func() float64 { return float64(cap(ps.queue)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "queue_delay",
+                       Help:      "Queue delay of the last emitted event",
+               }, func() float64 { return ps.lastQDelay.Seconds() }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "sinks",
+                       Help:      "Number of active sinks (connections)",
+               }, func() float64 { return float64(len(ps.sinks)) }))
+       ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "ws",
+                       Name:      "sinks_blocked",
+                       Help:      "Number of sinks (connections) that are busy and blocking the main event stream",
+               }, func() float64 {
+                       ps.mtx.Lock()
+                       defer ps.mtx.Unlock()
+                       blocked := 0
+                       for sink := range ps.sinks {
+                               blocked += len(sink.channel)
+                       }
+                       return float64(blocked)
+               }))
+       ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "events_in",
+               Help:      "Number of events received from postgresql notify channel",
+       })
+       ps.Reg.MustRegister(ps.eventsIn)
+       ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "events_out",
+               Help:      "Number of events sent to client sessions (before filtering)",
+       })
+       ps.Reg.MustRegister(ps.eventsOut)
+
+       maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "db_max_connections",
+               Help:      "Maximum number of open connections to the database",
+       })
+       ps.Reg.MustRegister(maxConnections)
+       openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "db_open_connections",
+               Help:      "Open connections to the database",
+       }, []string{"inuse"})
+       ps.Reg.MustRegister(openConnections)
+       go func() {
+               <-ps.ready
+               if ps.db == nil {
+                       return
+               }
+               for range time.Tick(time.Second) {
+                       stats := ps.db.Stats()
+                       maxConnections.Set(float64(stats.MaxOpenConnections))
+                       openConnections.WithLabelValues("0").Set(float64(stats.Idle))
+                       openConnections.WithLabelValues("1").Set(float64(stats.InUse))
+               }
+       }()
 }
 
 // Close stops listening for new events and disconnects all clients.
@@ -151,9 +234,9 @@ func (ps *pgEventSource) Run() {
                        ps.lastQDelay = e.Ready.Sub(e.Received)
 
                        ps.mtx.Lock()
-                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
                        for sink := range ps.sinks {
                                sink.channel <- e
+                               ps.eventsOut.Inc()
                        }
                        ps.mtx.Unlock()
                }
@@ -207,7 +290,7 @@ func (ps *pgEventSource) Run() {
                                logger:   ps.Logger,
                        }
                        ps.Logger.WithField("event", e).Debug("incoming")
-                       atomic.AddUint64(&ps.eventsIn, 1)
+                       ps.eventsIn.Inc()
                        ps.queue <- e
                        go e.Detail()
                }
@@ -258,8 +341,6 @@ func (ps *pgEventSource) DebugStatus() interface{} {
                blocked += len(sink.channel)
        }
        return map[string]interface{}{
-               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
-               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
                "Queue":        len(ps.queue),
                "QueueLimit":   cap(ps.queue),
                "QueueDelay":   stats.Duration(ps.lastQDelay),
index dd40835b6e56f5a3ed5ff07c91e32fbe4b920882..9af4c2bf9d7e2a6abc5f16e011b801ff8f479554 100644 (file)
@@ -12,9 +12,12 @@ import (
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
+var _ debugStatuser = (*pgEventSource)(nil)
+
 var _ = check.Suite(&eventSourceSuite{})
 
 type eventSourceSuite struct{}
@@ -46,6 +49,7 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
                DataSource: cfg.String(),
                QueueSize:  4,
                Logger:     ctxlog.TestLogger(c),
+               Reg:        prometheus.NewRegistry(),
        }
        go pges.Run()
        sinks := make([]eventSink, 18)
index b3403dabd00a3740d87f3136a21076ac4640fdc1..5f40143fcde0ddc2e33ac027259c3ccdb67ff9f0 100644 (file)
@@ -17,6 +17,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "golang.org/x/net/websocket"
 )
@@ -38,6 +39,7 @@ type router struct {
        mux       *http.ServeMux
        setupOnce sync.Once
        done      chan struct{}
+       reg       *prometheus.Registry
 
        lastReqID  int64
        lastReqMtx sync.Mutex
@@ -55,16 +57,24 @@ type debugStatuser interface {
 }
 
 func (rtr *router) setup() {
+       mSockets := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "ws",
+               Name:      "sockets",
+               Help:      "Number of connected sockets",
+       }, []string{"version"})
+       rtr.reg.MustRegister(mSockets)
+
        rtr.handler = &handler{
                PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
                QueueSize:   rtr.cluster.API.WebsocketClientEventQueue,
        }
        rtr.mux = http.NewServeMux()
-       rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
-       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
        rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
        rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
 
+       rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0, mSockets.WithLabelValues("0")))
+       rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1, mSockets.WithLabelValues("1")))
        rtr.mux.Handle("/_health/", &health.Handler{
                Token:  rtr.cluster.ManagementToken,
                Prefix: "/_health/",
@@ -79,7 +89,8 @@ func (rtr *router) setup() {
        })
 }
 
-func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+func (rtr *router) makeServer(newSession sessionFactory, gauge prometheus.Gauge) *websocket.Server {
+       var connected int64
        return &websocket.Server{
                Handshake: func(c *websocket.Config, r *http.Request) error {
                        return nil
@@ -88,6 +99,8 @@ func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
                        t0 := time.Now()
                        logger := ctxlog.FromContext(ws.Request().Context())
                        logger.Info("connected")
+                       atomic.AddInt64(&connected, 1)
+                       gauge.Set(float64(atomic.LoadInt64(&connected)))
 
                        stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
                                func(ws wsConn, sendq chan<- interface{}) (session, error) {
@@ -99,6 +112,8 @@ func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
                                "stats":   stats,
                        }).Info("disconnect")
                        ws.Close()
+                       atomic.AddInt64(&connected, -1)
+                       gauge.Set(float64(atomic.LoadInt64(&connected)))
                }),
        }
 }
index c38dcf59e68aac5a975f3362e7b258310341aef5..761e22e16c2cd4fd5025341e9ad284eb74b6f8c7 100644 (file)
@@ -29,6 +29,7 @@ func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg
                MaxOpenConns: cluster.PostgreSQL.ConnectionPool,
                QueueSize:    cluster.API.WebsocketServerEventQueue,
                Logger:       ctxlog.FromContext(ctx),
+               Reg:          reg,
        }
        done := make(chan struct{})
        go func() {
@@ -46,6 +47,7 @@ func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg
                eventSource:    eventSource,
                newPermChecker: func() permChecker { return newPermChecker(*client) },
                done:           done,
+               reg:            reg,
        }
        return rtr
 }