8460: Add systemd unit file.
[arvados.git] / services / ws / pg.go
index b6b064e49c2196a1b1b96ec5115a17913ce09d0c..dc899c56c0e63c43674ed52f464f859958a286f2 100644 (file)
@@ -6,6 +6,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/lib/pq"
@@ -39,6 +40,8 @@ type pgEventSource struct {
        shutdown   chan error
 
        lastQDelay time.Duration
+       eventsIn   uint64
+       eventsOut  uint64
 }
 
 func (ps *pgEventSource) setup() {
@@ -89,9 +92,11 @@ func (ps *pgEventSource) run() {
                                WithField("serial", e.Serial).
                                WithField("detail", e.Detail()).
                                Debug("event ready")
-                       ps.lastQDelay = time.Now().Sub(e.Received)
+                       e.Ready = time.Now()
+                       ps.lastQDelay = e.Ready.Sub(e.Received)
 
                        ps.mtx.Lock()
+                       atomic.AddUint64(&ps.eventsOut, uint64(len(ps.sinks)))
                        for sink := range ps.sinks {
                                sink.channel <- e
                        }
@@ -136,6 +141,7 @@ func (ps *pgEventSource) run() {
                                db:       ps.db,
                        }
                        logger(nil).WithField("event", e).Debug("incoming")
+                       atomic.AddUint64(&ps.eventsIn, 1)
                        ps.queue <- e
                        go e.Detail()
                }
@@ -174,6 +180,8 @@ func (ps *pgEventSource) Status() interface{} {
                blocked += len(sink.channel)
        }
        return map[string]interface{}{
+               "EventsIn":     atomic.LoadUint64(&ps.eventsIn),
+               "EventsOut":    atomic.LoadUint64(&ps.eventsOut),
                "Queue":        len(ps.queue),
                "QueueLimit":   cap(ps.queue),
                "QueueDelay":   fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),