X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8a5ef4b1c8086fed020cb5d45552f26ec85f74ab..894e1a3d70e9ec1b3e8619d1822410d665fabab4:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 60d4d40aca..e0269701c9 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -19,6 +19,11 @@ import ( "github.com/sirupsen/logrus" ) +var ( + listenerPingInterval = time.Minute + testSlowPing = false +) + type pgEventSource struct { DataSource string MaxOpenConns int @@ -132,16 +137,21 @@ func (ps *pgEventSource) setup() { Help: "Open connections to the database", }, []string{"inuse"}) ps.Reg.MustRegister(openConnections) + + updateDBStats := func() { + stats := ps.db.Stats() + maxConnections.Set(float64(stats.MaxOpenConnections)) + openConnections.WithLabelValues("0").Set(float64(stats.Idle)) + openConnections.WithLabelValues("1").Set(float64(stats.InUse)) + } go func() { <-ps.ready if ps.db == nil { return } + updateDBStats() for range time.Tick(time.Second) { - stats := ps.db.Stats() - maxConnections.Set(float64(stats.MaxOpenConnections)) - openConnections.WithLabelValues("0").Set(float64(stats.Idle)) - openConnections.WithLabelValues("1").Set(float64(stats.InUse)) + updateDBStats() } }() } @@ -192,7 +202,7 @@ func (ps *pgEventSource) Run() { return } if ps.MaxOpenConns <= 0 { - ps.Logger.Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file") + ps.Logger.Warn("no database connection limit configured -- consider setting PostgreSQL.ConnectionPool>0 in arvados-ws configuration file") } db.SetMaxOpenConns(ps.MaxOpenConns) if err = db.Ping(); err != nil { @@ -243,22 +253,36 @@ func (ps *pgEventSource) Run() { }() var serial uint64 - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() + + go func() { + ticker := time.NewTicker(listenerPingInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + ps.Logger.Debug("ctx done") + return + + case <-ticker.C: + ps.Logger.Debug("listener ping") + if testSlowPing { + time.Sleep(time.Second / 2) + } + err := ps.pqListener.Ping() + if err != nil { + ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err)) + continue + } + } + } + }() + for { select { case <-ctx.Done(): ps.Logger.Debug("ctx done") return - case <-ticker.C: - ps.Logger.Debug("listener ping") - err := ps.pqListener.Ping() - if err != nil { - ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err)) - continue - } - case pqEvent, ok := <-ps.pqListener.Notify: if !ok { ps.Logger.Error("pqListener Notify chan closed") @@ -276,7 +300,7 @@ func (ps *pgEventSource) Run() { ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel") continue } - logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) + logID, err := strconv.ParseInt(pqEvent.Extra, 10, 64) if err != nil { ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload") continue