X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0fc6eaead0bf7691e99d19e74ec33636909001a7..fcf3ca5baf89bdd944e3a7dcdc1b65f8ff4945ca:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 4de1d559ef..a4e886872c 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -29,8 +33,9 @@ func (c pgConfig) ConnectionString() string { } type pgEventSource struct { - DataSource string - QueueSize int + DataSource string + MaxOpenConns int + QueueSize int db *sql.DB pqListener *pq.Listener @@ -43,6 +48,9 @@ type pgEventSource struct { eventsOut uint64 cancel func() + + setupOnce sync.Once + ready chan bool } var _ debugStatuser = (*pgEventSource)(nil) @@ -63,15 +71,40 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { ps.cancel() } +func (ps *pgEventSource) setup() { + ps.ready = make(chan bool) +} + +// Close stops listening for new events and disconnects all clients. +func (ps *pgEventSource) Close() { + ps.WaitReady() + ps.cancel() +} + +// WaitReady returns when the event listener is connected. +func (ps *pgEventSource) WaitReady() { + ps.setupOnce.Do(ps.setup) + <-ps.ready +} + +// Run listens for event notifications on the "logs" channel and sends +// them to all subscribers. func (ps *pgEventSource) Run() { logger(nil).Debug("pgEventSource Run starting") defer logger(nil).Debug("pgEventSource Run finished") + ps.setupOnce.Do(ps.setup) + ready := ps.ready + defer func() { + if ready != nil { + close(ready) + } + }() + ctx, cancel := context.WithCancel(context.Background()) ps.cancel = cancel defer cancel() - ps.sinks = make(map[*pgEventSink]bool) defer func() { // Disconnect all clients ps.mtx.Lock() @@ -84,11 +117,15 @@ func (ps *pgEventSource) Run() { db, err := sql.Open("postgres", ps.DataSource) if err != nil { - logger(nil).WithError(err).Fatal("sql.Open failed") + logger(nil).WithError(err).Error("sql.Open failed") return } + if ps.MaxOpenConns <= 0 { + logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file") + } + db.SetMaxOpenConns(ps.MaxOpenConns) if err = db.Ping(); err != nil { - logger(nil).WithError(err).Fatal("db.Ping failed") + logger(nil).WithError(err).Error("db.Ping failed") return } ps.db = db @@ -96,11 +133,16 @@ func (ps *pgEventSource) Run() { ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem) err = ps.pqListener.Listen("logs") if err != nil { - logger(nil).WithError(err).Fatal("pq Listen failed") + logger(nil).WithError(err).Error("pq Listen failed") + return } defer ps.pqListener.Close() logger(nil).Debug("pq Listen setup done") + close(ready) + // Avoid double-close in deferred func + ready = nil + ps.queue = make(chan *event, ps.QueueSize) defer close(ps.queue) @@ -192,6 +234,9 @@ func (ps *pgEventSource) NewSink() eventSink { source: ps, } ps.mtx.Lock() + if ps.sinks == nil { + ps.sinks = make(map[*pgEventSink]bool) + } ps.sinks[sink] = true ps.mtx.Unlock() return sink @@ -201,6 +246,12 @@ func (ps *pgEventSource) DB() *sql.DB { return ps.db } +func (ps *pgEventSource) DBHealth() error { + ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + var i int + return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i) +} + func (ps *pgEventSource) DebugStatus() interface{} { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -216,6 +267,7 @@ func (ps *pgEventSource) DebugStatus() interface{} { "QueueDelay": stats.Duration(ps.lastQDelay), "Sinks": len(ps.sinks), "SinksBlocked": blocked, + "DBStats": ps.db.Stats(), } }