8460: Refactor "old events / other messages" mechanism to use the outgoing message...
[arvados.git] / services / ws / pg.go
index a5af9f765ba48f9187bb79322774cd252756fe1a..206cfeb73bce075ded3b7cf30d8b2363a7e418b8 100644 (file)
@@ -2,8 +2,6 @@ package main
 
 import (
        "database/sql"
-       "fmt"
-       "log"
        "strconv"
        "strings"
        "sync"
@@ -45,10 +43,10 @@ func (ps *pgEventSource) setup() {
 
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               log.Fatalf("sql.Open: %s", err)
+               logger(nil).WithError(err).Fatal("sql.Open failed")
        }
        if err = db.Ping(); err != nil {
-               log.Fatalf("db.Ping: %s", err)
+               logger(nil).WithError(err).Fatal("db.Ping failed")
        }
        ps.db = db
 
@@ -58,14 +56,15 @@ func (ps *pgEventSource) setup() {
                        // on missed events, we cannot recover from a
                        // dropped connection without breaking our
                        // promises to clients.
-                       ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+                       logger(nil).WithError(err).Error("listener problem")
+                       ps.shutdown <- err
                }
        })
        err = ps.pqListener.Listen("logs")
        if err != nil {
-               log.Fatal(err)
+               logger(nil).WithError(err).Fatal("pq Listen failed")
        }
-       debugLogf("pgEventSource listening")
+       logger(nil).Debug("pgEventSource listening")
 
        go ps.run()
 }
@@ -81,7 +80,12 @@ func (ps *pgEventSource) run() {
                        // concurrent queries would be bounded by
                        // client_count X client_queue_size.
                        e.Detail()
-                       debugLogf("event %d detail %+v", e.Serial, e.Detail())
+
+                       logger(nil).
+                               WithField("serial", e.Serial).
+                               WithField("detail", e.Detail()).
+                               Debug("event ready")
+
                        ps.mtx.Lock()
                        for sink := range ps.sinks {
                                sink.channel <- e
@@ -97,13 +101,13 @@ func (ps *pgEventSource) run() {
                select {
                case err, ok := <-ps.shutdown:
                        if ok {
-                               debugLogf("shutdown on error: %s", err)
+                               logger(nil).WithError(err).Info("shutdown")
                        }
                        close(eventQueue)
                        return
 
                case <-ticker.C:
-                       debugLogf("pgEventSource listener ping")
+                       logger(nil).Debug("listener ping")
                        ps.pqListener.Ping()
 
                case pqEvent, ok := <-ps.pqListener.Notify:
@@ -116,7 +120,7 @@ func (ps *pgEventSource) run() {
                        }
                        logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
                        if err != nil {
-                               log.Printf("bad notify payload: %+v", pqEvent)
+                               logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
                                continue
                        }
                        serial++
@@ -126,7 +130,7 @@ func (ps *pgEventSource) run() {
                                Serial:   serial,
                                db:       ps.db,
                        }
-                       debugLogf("event %d %+v", e.Serial, e)
+                       logger(nil).WithField("event", e).Debug("incoming")
                        eventQueue <- e
                        go e.Detail()
                }
@@ -152,6 +156,11 @@ func (ps *pgEventSource) NewSink() eventSink {
        return sink
 }
 
+func (ps *pgEventSource) DB() *sql.DB {
+       ps.setupOnce.Do(ps.setup)
+       return ps.db
+}
+
 type pgEventSink struct {
        channel chan *event
        source  *pgEventSource