"fmt"
"strconv"
"sync"
- "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/stats"
"github.com/lib/pq"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
MaxOpenConns int
QueueSize int
Logger logrus.FieldLogger
+ Reg *prometheus.Registry
db *sql.DB
pqListener *pq.Listener
mtx sync.Mutex
lastQDelay time.Duration
- eventsIn uint64
- eventsOut uint64
+ eventsIn prometheus.Counter
+ eventsOut prometheus.Counter
cancel func()
ready chan bool
}
-var _ debugStatuser = (*pgEventSource)(nil)
-
func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
if et == pq.ListenerEventConnected {
ps.Logger.Debug("pgEventSource connected")
func (ps *pgEventSource) setup() {
ps.ready = make(chan bool)
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "queue_len",
+ Help: "Current number of events in queue",
+ }, func() float64 { return float64(len(ps.queue)) }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "queue_cap",
+ Help: "Event queue capacity",
+ }, func() float64 { return float64(cap(ps.queue)) }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "queue_delay",
+ Help: "Queue delay of the last emitted event",
+ }, func() float64 { return ps.lastQDelay.Seconds() }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "sinks",
+ Help: "Number of active sinks (connections)",
+ }, func() float64 { return float64(len(ps.sinks)) }))
+ ps.Reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "sinks_blocked",
+ Help: "Number of sinks (connections) that are busy and blocking the main event stream",
+ }, func() float64 {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+ blocked := 0
+ for sink := range ps.sinks {
+ blocked += len(sink.channel)
+ }
+ return float64(blocked)
+ }))
+ ps.eventsIn = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "events_in",
+ Help: "Number of events received from postgresql notify channel",
+ })
+ ps.Reg.MustRegister(ps.eventsIn)
+ ps.eventsOut = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "events_out",
+ Help: "Number of events sent to client sessions (before filtering)",
+ })
+ ps.Reg.MustRegister(ps.eventsOut)
+
+ maxConnections := prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "db_max_connections",
+ Help: "Maximum number of open connections to the database",
+ })
+ ps.Reg.MustRegister(maxConnections)
+ openConnections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "db_open_connections",
+ Help: "Open connections to the database",
+ }, []string{"inuse"})
+ ps.Reg.MustRegister(openConnections)
+ go func() {
+ <-ps.ready
+ if ps.db == nil {
+ return
+ }
+ for range time.Tick(time.Second) {
+ stats := ps.db.Stats()
+ maxConnections.Set(float64(stats.MaxOpenConnections))
+ openConnections.WithLabelValues("0").Set(float64(stats.Idle))
+ openConnections.WithLabelValues("1").Set(float64(stats.InUse))
+ }
+ }()
}
// Close stops listening for new events and disconnects all clients.
ps.lastQDelay = e.Ready.Sub(e.Received)
ps.mtx.Lock()
- atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
for sink := range ps.sinks {
sink.channel <- e
+ ps.eventsOut.Inc()
}
ps.mtx.Unlock()
}
logger: ps.Logger,
}
ps.Logger.WithField("event", e).Debug("incoming")
- atomic.AddUint64(&ps.eventsIn, 1)
+ ps.eventsIn.Inc()
ps.queue <- e
go e.Detail()
}
blocked += len(sink.channel)
}
return map[string]interface{}{
- "EventsIn": atomic.LoadUint64(&ps.eventsIn),
- "EventsOut": atomic.LoadUint64(&ps.eventsOut),
"Queue": len(ps.queue),
"QueueLimit": cap(ps.queue),
"QueueDelay": stats.Duration(ps.lastQDelay),
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
)
mux *http.ServeMux
setupOnce sync.Once
done chan struct{}
+ reg *prometheus.Registry
lastReqID int64
lastReqMtx sync.Mutex
}
func (rtr *router) setup() {
+ mSockets := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "ws",
+ Name: "sockets",
+ Help: "Number of connected sockets",
+ }, []string{"version"})
+ rtr.reg.MustRegister(mSockets)
+
rtr.handler = &handler{
PingTimeout: time.Duration(rtr.cluster.API.SendTimeout),
QueueSize: rtr.cluster.API.WebsocketClientEventQueue,
}
rtr.mux = http.NewServeMux()
- rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0))
- rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1))
rtr.mux.Handle("/debug.json", rtr.jsonHandler(rtr.DebugStatus))
rtr.mux.Handle("/status.json", rtr.jsonHandler(rtr.Status))
+ rtr.mux.Handle("/websocket", rtr.makeServer(newSessionV0, mSockets.WithLabelValues("0")))
+ rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(newSessionV1, mSockets.WithLabelValues("1")))
rtr.mux.Handle("/_health/", &health.Handler{
Token: rtr.cluster.ManagementToken,
Prefix: "/_health/",
})
}
-func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
+func (rtr *router) makeServer(newSession sessionFactory, gauge prometheus.Gauge) *websocket.Server {
+ var connected int64
return &websocket.Server{
Handshake: func(c *websocket.Config, r *http.Request) error {
return nil
t0 := time.Now()
logger := ctxlog.FromContext(ws.Request().Context())
logger.Info("connected")
+ atomic.AddInt64(&connected, 1)
+ gauge.Set(float64(atomic.LoadInt64(&connected)))
stats := rtr.handler.Handle(ws, logger, rtr.eventSource,
func(ws wsConn, sendq chan<- interface{}) (session, error) {
"stats": stats,
}).Info("disconnect")
ws.Close()
+ atomic.AddInt64(&connected, -1)
+ gauge.Set(float64(atomic.LoadInt64(&connected)))
}),
}
}