X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/44c95f99098fa6c6acbfa82d4b6cbc6015eb6e39..9ebf73b1a1229bba507057ed2fb6a39635ce7e24:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index a4e886872c..3a82bf62b3 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -7,31 +7,17 @@ package main import ( "context" "database/sql" + "errors" + "fmt" "strconv" - "strings" "sync" "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/stats" + "git.arvados.org/arvados.git/sdk/go/stats" "github.com/lib/pq" ) -type pgConfig map[string]string - -func (c pgConfig) ConnectionString() string { - s := "" - for k, v := range c { - s += k - s += "='" - s += strings.Replace( - strings.Replace(v, `\`, `\\`, -1), - `'`, `\'`, -1) - s += "' " - } - return s -} - type pgEventSource struct { DataSource string MaxOpenConns int @@ -182,11 +168,15 @@ func (ps *pgEventSource) Run() { case <-ticker.C: logger(nil).Debug("listener ping") - ps.pqListener.Ping() + err := ps.pqListener.Ping() + if err != nil { + ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err)) + continue + } case pqEvent, ok := <-ps.pqListener.Notify: if !ok { - logger(nil).Debug("pqListener Notify chan closed") + logger(nil).Error("pqListener Notify chan closed") return } if pqEvent == nil { @@ -194,7 +184,7 @@ func (ps *pgEventSource) Run() { // itself in addition to sending us a // nil event, so this might be // superfluous: - ps.listenerProblem(-1, nil) + ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event")) continue } if pqEvent.Channel != "logs" { @@ -243,11 +233,13 @@ func (ps *pgEventSource) NewSink() eventSink { } func (ps *pgEventSource) DB() *sql.DB { + ps.WaitReady() return ps.db } func (ps *pgEventSource) DBHealth() error { - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() var i int return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i) } @@ -286,7 +278,7 @@ func (sink *pgEventSink) Stop() { // Ensure this sink cannot fill up and block the // server-side queue (which otherwise could in turn // block our mtx.Lock() here) - for _ = range sink.channel { + for range sink.channel { } }() sink.source.mtx.Lock()