X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c86cbaa6f286e50900dae3203a42044449e042f7..1667f9860de21d29bbe32bb827db29eca62d9aeb:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 622084c612..fe1876cc27 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -43,6 +43,9 @@ type pgEventSource struct { eventsOut uint64 cancel func() + + setupOnce sync.Once + ready chan bool } var _ debugStatuser = (*pgEventSource)(nil) @@ -63,12 +66,36 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { ps.cancel() } +func (ps *pgEventSource) setup() { + ps.ready = make(chan bool) +} + +// Close stops listening for new events and disconnects all clients. +func (ps *pgEventSource) Close() { + ps.WaitReady() + ps.cancel() +} + +// WaitReady returns when the event listener is connected. +func (ps *pgEventSource) WaitReady() { + ps.setupOnce.Do(ps.setup) + <-ps.ready +} + // Run listens for event notifications on the "logs" channel and sends // them to all subscribers. func (ps *pgEventSource) Run() { logger(nil).Debug("pgEventSource Run starting") defer logger(nil).Debug("pgEventSource Run finished") + ps.setupOnce.Do(ps.setup) + ready := ps.ready + defer func() { + if ready != nil { + close(ready) + } + }() + ctx, cancel := context.WithCancel(context.Background()) ps.cancel = cancel defer cancel() @@ -85,11 +112,11 @@ func (ps *pgEventSource) Run() { db, err := sql.Open("postgres", ps.DataSource) if err != nil { - logger(nil).WithError(err).Fatal("sql.Open failed") + logger(nil).WithError(err).Error("sql.Open failed") return } if err = db.Ping(); err != nil { - logger(nil).WithError(err).Fatal("db.Ping failed") + logger(nil).WithError(err).Error("db.Ping failed") return } ps.db = db @@ -97,11 +124,16 @@ func (ps *pgEventSource) Run() { ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem) err = ps.pqListener.Listen("logs") if err != nil { - logger(nil).WithError(err).Fatal("pq Listen failed") + logger(nil).WithError(err).Error("pq Listen failed") + return } defer ps.pqListener.Close() logger(nil).Debug("pq Listen setup done") + close(ready) + // Avoid double-close in deferred func + ready = nil + ps.queue = make(chan *event, ps.QueueSize) defer close(ps.queue)