Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / services / ws / event_source.go
index cfb828b2a5d84c6d16407866374e1f4900185f84..3a82bf62b3e9351a95d2abe4c56ae942fededa4c 100644 (file)
@@ -7,31 +7,17 @@ package main
 import (
        "context"
        "database/sql"
+       "errors"
+       "fmt"
        "strconv"
-       "strings"
        "sync"
        "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/stats"
+       "git.arvados.org/arvados.git/sdk/go/stats"
        "github.com/lib/pq"
 )
 
-type pgConfig map[string]string
-
-func (c pgConfig) ConnectionString() string {
-       s := ""
-       for k, v := range c {
-               s += k
-               s += "='"
-               s += strings.Replace(
-                       strings.Replace(v, `\`, `\\`, -1),
-                       `'`, `\'`, -1)
-               s += "' "
-       }
-       return s
-}
-
 type pgEventSource struct {
        DataSource   string
        MaxOpenConns int
@@ -182,11 +168,15 @@ func (ps *pgEventSource) Run() {
 
                case <-ticker.C:
                        logger(nil).Debug("listener ping")
-                       ps.pqListener.Ping()
+                       err := ps.pqListener.Ping()
+                       if err != nil {
+                               ps.listenerProblem(-1, fmt.Errorf("pqListener ping failed: %s", err))
+                               continue
+                       }
 
                case pqEvent, ok := <-ps.pqListener.Notify:
                        if !ok {
-                               logger(nil).Debug("pqListener Notify chan closed")
+                               logger(nil).Error("pqListener Notify chan closed")
                                return
                        }
                        if pqEvent == nil {
@@ -194,7 +184,7 @@ func (ps *pgEventSource) Run() {
                                // itself in addition to sending us a
                                // nil event, so this might be
                                // superfluous:
-                               ps.listenerProblem(-1, nil)
+                               ps.listenerProblem(-1, errors.New("pqListener Notify chan received nil event"))
                                continue
                        }
                        if pqEvent.Channel != "logs" {
@@ -288,7 +278,7 @@ func (sink *pgEventSink) Stop() {
                // Ensure this sink cannot fill up and block the
                // server-side queue (which otherwise could in turn
                // block our mtx.Lock() here)
-               for _ = range sink.channel {
+               for range sink.channel {
                }
        }()
        sink.source.mtx.Lock()