X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2a469c4874895b05ee137e2382fd882680b3feb2..ba418300c50e1375ca9938562579b7bd6bf9490d:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 622084c612..341464de50 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -1,36 +1,29 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package ws import ( "context" "database/sql" + "errors" + "fmt" "strconv" - "strings" "sync" "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/stats" + "git.arvados.org/arvados.git/sdk/go/stats" "github.com/lib/pq" + "github.com/sirupsen/logrus" ) -type pgConfig map[string]string - -func (c pgConfig) ConnectionString() string { - s := "" - for k, v := range c { - s += k - s += "='" - s += strings.Replace( - strings.Replace(v, `\`, `\\`, -1), - `'`, `\'`, -1) - s += "' " - } - return s -} - type pgEventSource struct { - DataSource string - QueueSize int + DataSource string + MaxOpenConns int + QueueSize int + Logger logrus.FieldLogger db *sql.DB pqListener *pq.Listener @@ -43,31 +36,58 @@ type pgEventSource struct { eventsOut uint64 cancel func() + + setupOnce sync.Once + ready chan bool } var _ debugStatuser = (*pgEventSource)(nil) func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) { if et == pq.ListenerEventConnected { - logger(nil).Debug("pgEventSource connected") + ps.Logger.Debug("pgEventSource connected") return } // Until we have a mechanism for catching up on missed events, // we cannot recover from a dropped connection without // breaking our promises to clients. - logger(nil). + ps.Logger. WithField("eventType", et). WithError(err). Error("listener problem") ps.cancel() } +func (ps *pgEventSource) setup() { + ps.ready = make(chan bool) +} + +// Close stops listening for new events and disconnects all clients. +func (ps *pgEventSource) Close() { + ps.WaitReady() + ps.cancel() +} + +// WaitReady returns when the event listener is connected. +func (ps *pgEventSource) WaitReady() { + ps.setupOnce.Do(ps.setup) + <-ps.ready +} + // Run listens for event notifications on the "logs" channel and sends // them to all subscribers. func (ps *pgEventSource) Run() { - logger(nil).Debug("pgEventSource Run starting") - defer logger(nil).Debug("pgEventSource Run finished") + ps.Logger.Debug("pgEventSource Run starting") + defer ps.Logger.Debug("pgEventSource Run finished") + + ps.setupOnce.Do(ps.setup) + ready := ps.ready + defer func() { + if ready != nil { + close(ready) + } + }() ctx, cancel := context.WithCancel(context.Background()) ps.cancel = cancel @@ -85,11 +105,15 @@ func (ps *pgEventSource) Run() { db, err := sql.Open("postgres", ps.DataSource) if err != nil { - logger(nil).WithError(err).Fatal("sql.Open failed") + ps.Logger.WithError(err).Error("sql.Open failed") return } + if ps.MaxOpenConns <= 0 { + ps.Logger.Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file") + } + db.SetMaxOpenConns(ps.MaxOpenConns) if err = db.Ping(); err != nil { - logger(nil).WithError(err).Fatal("db.Ping failed") + ps.Logger.WithError(err).Error("db.Ping failed") return } ps.db = db @@ -97,10 +121,15 @@ func (ps *pgEventSource) Run() { ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem) err = ps.pqListener.Listen("logs") if err != nil { - logger(nil).WithError(err).Fatal("pq Listen failed") + ps.Logger.WithError(err).Error("pq Listen failed") + return } defer ps.pqListener.Close() - logger(nil).Debug("pq Listen setup done") + ps.Logger.Debug("pq Listen setup done") + + close(ready) + // Avoid double-close in deferred func + ready = nil ps.queue = make(chan *event, ps.QueueSize) defer close(ps.queue) @@ -114,7 +143,7 @@ func (ps *pgEventSource) Run() { // client_count X client_queue_size. e.Detail() - logger(nil). + ps.Logger. WithField("serial", e.Serial). WithField("detail", e.Detail()). Debug("event ready") @@ -136,16 +165,20 @@ func (ps *pgEventSource) Run() { for { select { case <-ctx.Done(): - logger(nil).Debug("ctx done") + ps.Logger.Debug("ctx done") return case <-ticker.C: - logger(nil).Debug("listener ping") - ps.pqListener.Ping() + ps.Logger.Debug("listener ping") + err := ps.pqListener.Ping() + if err != nil { + ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err)) + continue + } case pqEvent, ok := <-ps.pqListener.Notify: if !ok { - logger(nil).Debug("pqListener Notify chan closed") + ps.Logger.Error("pqListener Notify chan closed") return } if pqEvent == nil { @@ -153,16 +186,16 @@ func (ps *pgEventSource) Run() { // itself in addition to sending us a // nil event, so this might be // superfluous: - ps.listenerProblem(-1, nil) + ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event")) continue } if pqEvent.Channel != "logs" { - logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel") + ps.Logger.WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel") continue } logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64) if err != nil { - logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload") + ps.Logger.WithField("pqEvent", pqEvent).Error("bad notify payload") continue } serial++ @@ -171,8 +204,9 @@ func (ps *pgEventSource) Run() { Received: time.Now(), Serial: serial, db: ps.db, + logger: ps.Logger, } - logger(nil).WithField("event", e).Debug("incoming") + ps.Logger.WithField("event", e).Debug("incoming") atomic.AddUint64(&ps.eventsIn, 1) ps.queue <- e go e.Detail() @@ -202,9 +236,20 @@ func (ps *pgEventSource) NewSink() eventSink { } func (ps *pgEventSource) DB() *sql.DB { + ps.WaitReady() return ps.db } +func (ps *pgEventSource) DBHealth() error { + if ps.db == nil { + return errors.New("database not connected") + } + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + var i int + return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i) +} + func (ps *pgEventSource) DebugStatus() interface{} { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -220,6 +265,7 @@ func (ps *pgEventSource) DebugStatus() interface{} { "QueueDelay": stats.Duration(ps.lastQDelay), "Sinks": len(ps.sinks), "SinksBlocked": blocked, + "DBStats": ps.db.Stats(), } } @@ -238,7 +284,7 @@ func (sink *pgEventSink) Stop() { // Ensure this sink cannot fill up and block the // server-side queue (which otherwise could in turn // block our mtx.Lock() here) - for _ = range sink.channel { + for range sink.channel { } }() sink.source.mtx.Lock()