Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/nokogiri...
[arvados.git] / services / ws / event_source.go
index 622084c61227d5c0caad1986611c58ceccacba62..3a82bf62b3e9351a95d2abe4c56ae942fededa4c 100644 (file)
@@ -1,36 +1,27 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
        "context"
        "database/sql"
+       "errors"
+       "fmt"
        "strconv"
-       "strings"
        "sync"
        "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/stats"
+       "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/lib/pq"
 )
 
-type pgConfig map[string]string
-
-func (c pgConfig) ConnectionString() string {
-       s := ""
-       for k, v := range c {
-               s += k
-               s += "='"
-               s += strings.Replace(
-                       strings.Replace(v, `\`, `\\`, -1),
-                       `'`, `\'`, -1)
-               s += "' "
-       }
-       return s
-}
-
 type pgEventSource struct {
-       DataSource string
-       QueueSize  int
+       DataSource   string
+       MaxOpenConns int
+       QueueSize    int
 
        db         *sql.DB
        pqListener *pq.Listener
@@ -43,6 +34,9 @@ type pgEventSource struct {
        eventsOut  uint64
 
        cancel func()
+
+       setupOnce sync.Once
+       ready     chan bool
 }
 
 var _ debugStatuser = (*pgEventSource)(nil)
@@ -63,12 +57,36 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
        ps.cancel()
 }
 
+func (ps *pgEventSource) setup() {
+       ps.ready = make(chan bool)
+}
+
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+       ps.WaitReady()
+       ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
+       ps.setupOnce.Do(ps.setup)
+       <-ps.ready
+}
+
 // Run listens for event notifications on the "logs" channel and sends
 // them to all subscribers.
 func (ps *pgEventSource) Run() {
        logger(nil).Debug("pgEventSource Run starting")
        defer logger(nil).Debug("pgEventSource Run finished")
 
+       ps.setupOnce.Do(ps.setup)
+       ready := ps.ready
+       defer func() {
+               if ready != nil {
+                       close(ready)
+               }
+       }()
+
        ctx, cancel := context.WithCancel(context.Background())
        ps.cancel = cancel
        defer cancel()
@@ -85,11 +103,15 @@ func (ps *pgEventSource) Run() {
 
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               logger(nil).WithError(err).Fatal("sql.Open failed")
+               logger(nil).WithError(err).Error("sql.Open failed")
                return
        }
+       if ps.MaxOpenConns <= 0 {
+               logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
+       }
+       db.SetMaxOpenConns(ps.MaxOpenConns)
        if err = db.Ping(); err != nil {
-               logger(nil).WithError(err).Fatal("db.Ping failed")
+               logger(nil).WithError(err).Error("db.Ping failed")
                return
        }
        ps.db = db
@@ -97,11 +119,16 @@ func (ps *pgEventSource) Run() {
        ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
        err = ps.pqListener.Listen("logs")
        if err != nil {
-               logger(nil).WithError(err).Fatal("pq Listen failed")
+               logger(nil).WithError(err).Error("pq Listen failed")
+               return
        }
        defer ps.pqListener.Close()
        logger(nil).Debug("pq Listen setup done")
 
+       close(ready)
+       // Avoid double-close in deferred func
+       ready = nil
+
        ps.queue = make(chan *event, ps.QueueSize)
        defer close(ps.queue)
 
@@ -141,11 +168,15 @@ func (ps *pgEventSource) Run() {
 
                case <-ticker.C:
                        logger(nil).Debug("listener ping")
-                       ps.pqListener.Ping()
+                       err := ps.pqListener.Ping()
+                       if err != nil {
+                               ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
+                               continue
+                       }
 
                case pqEvent, ok := <-ps.pqListener.Notify:
                        if !ok {
-                               logger(nil).Debug("pqListener Notify chan closed")
+                               logger(nil).Error("pqListener Notify chan closed")
                                return
                        }
                        if pqEvent == nil {
@@ -153,7 +184,7 @@ func (ps *pgEventSource) Run() {
                                // itself in addition to sending us a
                                // nil event, so this might be
                                // superfluous:
-                               ps.listenerProblem(-1, nil)
+                               ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
                                continue
                        }
                        if pqEvent.Channel != "logs" {
@@ -202,9 +233,17 @@ func (ps *pgEventSource) NewSink() eventSink {
 }
 
 func (ps *pgEventSource) DB() *sql.DB {
+       ps.WaitReady()
        return ps.db
 }
 
+func (ps *pgEventSource) DBHealth() error {
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       defer cancel()
+       var i int
+       return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
+}
+
 func (ps *pgEventSource) DebugStatus() interface{} {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
@@ -220,6 +259,7 @@ func (ps *pgEventSource) DebugStatus() interface{} {
                "QueueDelay":   stats.Duration(ps.lastQDelay),
                "Sinks":        len(ps.sinks),
                "SinksBlocked": blocked,
+               "DBStats":      ps.db.Stats(),
        }
 }
 
@@ -238,7 +278,7 @@ func (sink *pgEventSink) Stop() {
                // Ensure this sink cannot fill up and block the
                // server-side queue (which otherwise could in turn
                // block our mtx.Lock() here)
-               for _ = range sink.channel {
+               for range sink.channel {
                }
        }()
        sink.source.mtx.Lock()