X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f3e02106cfc33ffe333af9e303a9e68f3ecfb2e4..c5a8ad7751e13560a6cde34395ea76f380c8a80d:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index ea90ec7242..daf9a94cc1 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -1,6 +1,7 @@ package main import ( + "context" "database/sql" "strconv" "strings" @@ -28,58 +29,118 @@ func (c pgConfig) ConnectionString() string { } type pgEventSource struct { - DataSource string - QueueSize int + DataSource string + MaxOpenConns int + QueueSize int db *sql.DB pqListener *pq.Listener queue chan *event sinks map[*pgEventSink]bool - setupOnce sync.Once mtx sync.Mutex - shutdown chan error lastQDelay time.Duration eventsIn uint64 eventsOut uint64 + + cancel func() + + setupOnce sync.Once + ready chan bool } var _ debugStatuser = (*pgEventSource)(nil) +func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { + if et == pq.ListenerEventConnected { + logger(nil).Debug("pgEventSource connected") + return + } + + // Until we have a mechanism for catching up on missed events, + // we cannot recover from a dropped connection without + // breaking our promises to clients. + logger(nil). + WithField("eventType", et). + WithError(err). + Error("listener problem") + ps.cancel() +} + func (ps *pgEventSource) setup() { - ps.shutdown = make(chan error, 1) - ps.sinks = make(map[*pgEventSink]bool) + ps.ready = make(chan bool) +} + +// Close stops listening for new events and disconnects all clients. +func (ps *pgEventSource) Close() { + ps.WaitReady() + ps.cancel() +} + +// WaitReady returns when the event listener is connected. +func (ps *pgEventSource) WaitReady() { + ps.setupOnce.Do(ps.setup) + <-ps.ready +} + +// Run listens for event notifications on the "logs" channel and sends +// them to all subscribers. +func (ps *pgEventSource) Run() { + logger(nil).Debug("pgEventSource Run starting") + defer logger(nil).Debug("pgEventSource Run finished") + + ps.setupOnce.Do(ps.setup) + ready := ps.ready + defer func() { + if ready != nil { + close(ready) + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + ps.cancel = cancel + defer cancel() + + defer func() { + // Disconnect all clients + ps.mtx.Lock() + for sink := range ps.sinks { + close(sink.channel) + } + ps.sinks = nil + ps.mtx.Unlock() + }() db, err := sql.Open("postgres", ps.DataSource) if err != nil { - logger(nil).WithError(err).Fatal("sql.Open failed") + logger(nil).WithError(err).Error("sql.Open failed") + return } + if ps.MaxOpenConns <= 0 { + logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file") + } + db.SetMaxOpenConns(ps.MaxOpenConns) if err = db.Ping(); err != nil { - logger(nil).WithError(err).Fatal("db.Ping failed") + logger(nil).WithError(err).Error("db.Ping failed") + return } ps.db = db - ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { - if err != nil { - // Until we have a mechanism for catching up - // on missed events, we cannot recover from a - // dropped connection without breaking our - // promises to clients. - logger(nil).WithError(err).Error("listener problem") - ps.shutdown <- err - } - }) + ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem) err = ps.pqListener.Listen("logs") if err != nil { - logger(nil).WithError(err).Fatal("pq Listen failed") + logger(nil).WithError(err).Error("pq Listen failed") + return } - logger(nil).Debug("pgEventSource listening") + defer ps.pqListener.Close() + logger(nil).Debug("pq Listen setup done") - go ps.run() -} + close(ready) + // Avoid double-close in deferred func + ready = nil -func (ps *pgEventSource) run() { ps.queue = make(chan *event, ps.QueueSize) + defer close(ps.queue) go func() { for e := range ps.queue { @@ -111,11 +172,8 @@ func (ps *pgEventSource) run() { defer ticker.Stop() for { select { - case err, ok := <-ps.shutdown: - if ok { - logger(nil).WithError(err).Info("shutdown") - } - close(ps.queue) + case <-ctx.Done(): + logger(nil).Debug("ctx done") return case <-ticker.C: @@ -124,10 +182,19 @@ func (ps *pgEventSource) run() { case pqEvent, ok := <-ps.pqListener.Notify: if !ok { - close(ps.queue) + logger(nil).Debug("pqListener Notify chan closed") return } + if pqEvent == nil { + // pq should call listenerProblem + // itself in addition to sending us a + // nil event, so this might be + // superfluous: + ps.listenerProblem(-1, nil) + continue + } if pqEvent.Channel != "logs" { + logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel") continue } logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) @@ -158,22 +225,29 @@ func (ps *pgEventSource) run() { // quickly as possible because when one sink stops being ready, all // other sinks block. func (ps *pgEventSource) NewSink() eventSink { - ps.setupOnce.Do(ps.setup) sink := &pgEventSink{ channel: make(chan *event, 1), source: ps, } ps.mtx.Lock() + if ps.sinks == nil { + ps.sinks = make(map[*pgEventSink]bool) + } ps.sinks[sink] = true ps.mtx.Unlock() return sink } func (ps *pgEventSource) DB() *sql.DB { - ps.setupOnce.Do(ps.setup) return ps.db } +func (ps *pgEventSource) DBHealth() error { + ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + var i int + return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i) +} + func (ps *pgEventSource) DebugStatus() interface{} { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -189,6 +263,7 @@ func (ps *pgEventSource) DebugStatus() interface{} { "QueueDelay": stats.Duration(ps.lastQDelay), "Sinks": len(ps.sinks), "SinksBlocked": blocked, + "DBStats": ps.db.Stats(), } } @@ -201,6 +276,7 @@ func (sink *pgEventSink) Channel() <-chan *event { return sink.channel } +// Stop sending events to the sink's channel. func (sink *pgEventSink) Stop() { go func() { // Ensure this sink cannot fill up and block the @@ -210,7 +286,9 @@ func (sink *pgEventSink) Stop() { } }() sink.source.mtx.Lock() - delete(sink.source.sinks, sink) + if _, ok := sink.source.sinks[sink]; ok { + delete(sink.source.sinks, sink) + close(sink.channel) + } sink.source.mtx.Unlock() - close(sink.channel) }