8460: Avoid log.Fatal once started.
authorTom Clegg <tom@curoverse.com>
Mon, 14 Nov 2016 21:10:32 +0000 (16:10 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 15 Nov 2016 07:46:02 +0000 (02:46 -0500)
services/ws/pg.go

index e766f6c6b966b7a666fb9b53ffe9424b7f6dda3a..a5af9f765ba48f9187bb79322774cd252756fe1a 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "database/sql"
+       "fmt"
        "log"
        "strconv"
        "strings"
@@ -30,18 +31,18 @@ type pgEventSource struct {
        DataSource string
        QueueSize  int
 
+       db         *sql.DB
        pqListener *pq.Listener
        sinks      map[*pgEventSink]bool
        setupOnce  sync.Once
        mtx        sync.Mutex
+       shutdown   chan error
 }
 
 func (ps *pgEventSource) setup() {
+       ps.shutdown = make(chan error, 1)
        ps.sinks = make(map[*pgEventSink]bool)
-       go ps.run()
-}
 
-func (ps *pgEventSource) run() {
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
                log.Fatalf("sql.Open: %s", err)
@@ -49,23 +50,27 @@ func (ps *pgEventSource) run() {
        if err = db.Ping(); err != nil {
                log.Fatalf("db.Ping: %s", err)
        }
+       ps.db = db
 
-       listener := pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+       ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
                if err != nil {
                        // Until we have a mechanism for catching up
                        // on missed events, we cannot recover from a
                        // dropped connection without breaking our
                        // promises to clients.
-                       log.Fatalf("pgEventSource listener problem: %s", err)
+                       ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
                }
        })
-       err = listener.Listen("logs")
+       err = ps.pqListener.Listen("logs")
        if err != nil {
                log.Fatal(err)
        }
-
        debugLogf("pgEventSource listening")
 
+       go ps.run()
+}
+
+func (ps *pgEventSource) run() {
        eventQueue := make(chan *event, ps.QueueSize)
 
        go func() {
@@ -90,11 +95,18 @@ func (ps *pgEventSource) run() {
        defer ticker.Stop()
        for {
                select {
+               case err, ok := <-ps.shutdown:
+                       if ok {
+                               debugLogf("shutdown on error: %s", err)
+                       }
+                       close(eventQueue)
+                       return
+
                case <-ticker.C:
                        debugLogf("pgEventSource listener ping")
-                       listener.Ping()
+                       ps.pqListener.Ping()
 
-               case pqEvent, ok := <-listener.Notify:
+               case pqEvent, ok := <-ps.pqListener.Notify:
                        if !ok {
                                close(eventQueue)
                                return
@@ -112,7 +124,7 @@ func (ps *pgEventSource) run() {
                                LogID:    logID,
                                Received: time.Now(),
                                Serial:   serial,
-                               db:       db,
+                               db:       ps.db,
                        }
                        debugLogf("event %d %+v", e.Serial, e)
                        eventQueue <- e