X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0f5b0542513b572959e39400bae42e69aeb1a7b6..b34c4b9234777d68b675aebf77680b8dd8708a6d:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 923a341b75..e0269701c9 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -19,6 +19,11 @@ import ( "github.com/sirupsen/logrus" ) +var ( + listenerPingInterval = time.Minute + testSlowPing = false +) + type pgEventSource struct { DataSource string MaxOpenConns int @@ -248,22 +253,36 @@ func (ps *pgEventSource) Run() { }() var serial uint64 - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() + + go func() { + ticker := time.NewTicker(listenerPingInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + ps.Logger.Debug("ctx done") + return + + case <-ticker.C: + ps.Logger.Debug("listener ping") + if testSlowPing { + time.Sleep(time.Second / 2) + } + err := ps.pqListener.Ping() + if err != nil { + ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err)) + continue + } + } + } + }() + for { select { case <-ctx.Done(): ps.Logger.Debug("ctx done") return - case <-ticker.C: - ps.Logger.Debug("listener ping") - err := ps.pqListener.Ping() - if err != nil { - ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err)) - continue - } - case pqEvent, ok := <-ps.pqListener.Notify: if !ok { ps.Logger.Error("pqListener Notify chan closed")