Merge branch '10669-safe-http-cache'
[arvados.git] / services / ws / event_source.go
index 622084c61227d5c0caad1986611c58ceccacba62..fe1876cc2788e9ccaaaff2f1f21d3116b3974b38 100644 (file)
@@ -43,6 +43,9 @@ type pgEventSource struct {
        eventsOut  uint64
 
        cancel func()
+
+       setupOnce sync.Once
+       ready     chan bool
 }
 
 var _ debugStatuser = (*pgEventSource)(nil)
@@ -63,12 +66,36 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
        ps.cancel()
 }
 
+func (ps *pgEventSource) setup() {
+       ps.ready = make(chan bool)
+}
+
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+       ps.WaitReady()
+       ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
+       ps.setupOnce.Do(ps.setup)
+       <-ps.ready
+}
+
 // Run listens for event notifications on the "logs" channel and sends
 // them to all subscribers.
 func (ps *pgEventSource) Run() {
        logger(nil).Debug("pgEventSource Run starting")
        defer logger(nil).Debug("pgEventSource Run finished")
 
+       ps.setupOnce.Do(ps.setup)
+       ready := ps.ready
+       defer func() {
+               if ready != nil {
+                       close(ready)
+               }
+       }()
+
        ctx, cancel := context.WithCancel(context.Background())
        ps.cancel = cancel
        defer cancel()
@@ -85,11 +112,11 @@ func (ps *pgEventSource) Run() {
 
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               logger(nil).WithError(err).Fatal("sql.Open failed")
+               logger(nil).WithError(err).Error("sql.Open failed")
                return
        }
        if err = db.Ping(); err != nil {
-               logger(nil).WithError(err).Fatal("db.Ping failed")
+               logger(nil).WithError(err).Error("db.Ping failed")
                return
        }
        ps.db = db
@@ -97,11 +124,16 @@ func (ps *pgEventSource) Run() {
        ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
        err = ps.pqListener.Listen("logs")
        if err != nil {
-               logger(nil).WithError(err).Fatal("pq Listen failed")
+               logger(nil).WithError(err).Error("pq Listen failed")
+               return
        }
        defer ps.pqListener.Close()
        logger(nil).Debug("pq Listen setup done")
 
+       close(ready)
+       // Avoid double-close in deferred func
+       ready = nil
+
        ps.queue = make(chan *event, ps.QueueSize)
        defer close(ps.queue)