8460: Add systemd unit file.
[arvados.git] / services / ws / pg.go
index 5e8e63e01fc3ec27ccffbb20875d1c5df79639db..dc899c56c0e63c43674ed52f464f859958a286f2 100644 (file)
@@ -2,10 +2,11 @@ package main
 
 import (
        "database/sql"
-       "log"
+       "fmt"
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/lib/pq"
@@ -27,58 +28,75 @@ func (c pgConfig) ConnectionString() string {
 }
 
 type pgEventSource struct {
-       PgConfig  pgConfig
-       QueueSize int
+       DataSource string
+       QueueSize  int
 
+       db         *sql.DB
        pqListener *pq.Listener
+       queue      chan *event
        sinks      map[*pgEventSink]bool
        setupOnce  sync.Once
        mtx        sync.Mutex
+       shutdown   chan error
+
+       lastQDelay time.Duration
+       eventsIn   uint64
+       eventsOut  uint64
 }
 
 func (ps *pgEventSource) setup() {
+       ps.shutdown = make(chan error, 1)
        ps.sinks = make(map[*pgEventSink]bool)
-       go ps.run()
-}
 
-func (ps *pgEventSource) run() {
-       db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
+       db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               log.Fatal(err)
+               logger(nil).WithError(err).Fatal("sql.Open failed")
+       }
+       if err = db.Ping(); err != nil {
+               logger(nil).WithError(err).Fatal("db.Ping failed")
        }
+       ps.db = db
 
-       listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+       ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
                if err != nil {
                        // Until we have a mechanism for catching up
                        // on missed events, we cannot recover from a
                        // dropped connection without breaking our
                        // promises to clients.
-                       log.Fatalf("pgEventSource listener problem: %s", err)
+                       logger(nil).WithError(err).Error("listener problem")
+                       ps.shutdown <- err
                }
        })
-       err = listener.Listen("logs")
+       err = ps.pqListener.Listen("logs")
        if err != nil {
-               log.Fatal(err)
+               logger(nil).WithError(err).Fatal("pq Listen failed")
        }
-       debugLogf("pgEventSource listening")
-       go func() {
-               for _ = range time.NewTicker(time.Minute).C {
-                       debugLogf("pgEventSource listener ping")
-                       listener.Ping()
-               }
-       }()
+       logger(nil).Debug("pgEventSource listening")
+
+       go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+       ps.queue = make(chan *event, ps.QueueSize)
 
-       eventQueue := make(chan *event, ps.QueueSize)
        go func() {
-               for e := range eventQueue {
+               for e := range ps.queue {
                        // Wait for the "select ... from logs" call to
                        // finish. This limits max concurrent queries
                        // to ps.QueueSize. Without this, max
                        // concurrent queries would be bounded by
                        // client_count X client_queue_size.
                        e.Detail()
-                       debugLogf("event %d detail %+v", e.Serial, e.Detail())
+
+                       logger(nil).
+                               WithField("serial", e.Serial).
+                               WithField("detail", e.Detail()).
+                               Debug("event ready")
+                       e.Ready = time.Now()
+                       ps.lastQDelay = e.Ready.Sub(e.Received)
+
                        ps.mtx.Lock()
+                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
                        for sink := range ps.sinks {
                                sink.channel <- e
                        }
@@ -87,25 +105,46 @@ func (ps *pgEventSource) run() {
        }()
 
        var serial uint64
-       for pqEvent := range listener.Notify {
-               if pqEvent.Channel != "logs" {
-                       continue
-               }
-               logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
-               if err != nil {
-                       log.Printf("bad notify payload: %+v", pqEvent)
-                       continue
-               }
-               serial++
-               e := &event{
-                       LogID:    logID,
-                       Received: time.Now(),
-                       Serial:   serial,
-                       db:       db,
+       ticker := time.NewTicker(time.Minute)
+       defer ticker.Stop()
+       for {
+               select {
+               case err, ok := <-ps.shutdown:
+                       if ok {
+                               logger(nil).WithError(err).Info("shutdown")
+                       }
+                       close(ps.queue)
+                       return
+
+               case <-ticker.C:
+                       logger(nil).Debug("listener ping")
+                       ps.pqListener.Ping()
+
+               case pqEvent, ok := <-ps.pqListener.Notify:
+                       if !ok {
+                               close(ps.queue)
+                               return
+                       }
+                       if pqEvent.Channel != "logs" {
+                               continue
+                       }
+                       logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+                       if err != nil {
+                               logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
+                               continue
+                       }
+                       serial++
+                       e := &event{
+                               LogID:    logID,
+                               Received: time.Now(),
+                               Serial:   serial,
+                               db:       ps.db,
+                       }
+                       logger(nil).WithField("event", e).Debug("incoming")
+                       atomic.AddUint64(&ps.eventsIn, 1)
+                       ps.queue <- e
+                       go e.Detail()
                }
-               debugLogf("event %d %+v", e.Serial, e)
-               eventQueue <- e
-               go e.Detail()
        }
 }
 
@@ -128,6 +167,29 @@ func (ps *pgEventSource) NewSink() eventSink {
        return sink
 }
 
+func (ps *pgEventSource) DB() *sql.DB {
+       ps.setupOnce.Do(ps.setup)
+       return ps.db
+}
+
+func (ps *pgEventSource) Status() interface{} {
+       ps.mtx.Lock()
+       defer ps.mtx.Unlock()
+       blocked := 0
+       for sink := range ps.sinks {
+               blocked += len(sink.channel)
+       }
+       return map[string]interface{}{
+               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
+               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
+               "Queue":        len(ps.queue),
+               "QueueLimit":   cap(ps.queue),
+               "QueueDelay":   fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),
+               "Sinks":        len(ps.sinks),
+               "SinksBlocked": blocked,
+       }
+}
+
 type pgEventSink struct {
        channel chan *event
        source  *pgEventSource