11901: Require management token for health checks.
[arvados.git] / services / ws / event_source.go
index ed1ac0db97a29f8bd3c9e53791e00e2eda6565dd..6a308b3a62df3ac14c36d51ba8b08b7a7b7694f6 100644 (file)
@@ -29,8 +29,9 @@ func (c pgConfig) ConnectionString() string {
 }
 
 type pgEventSource struct {
-       DataSource string
-       QueueSize  int
+       DataSource   string
+       MaxOpenConns int
+       QueueSize    int
 
        db         *sql.DB
        pqListener *pq.Listener
@@ -70,9 +71,14 @@ func (ps *pgEventSource) setup() {
        ps.ready = make(chan bool)
 }
 
-// waitReady returns when private fields (cancel, db) are available
-// for tests to use.
-func (ps *pgEventSource) waitReady() {
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+       ps.WaitReady()
+       ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
        ps.setupOnce.Do(ps.setup)
        <-ps.ready
 }
@@ -84,6 +90,12 @@ func (ps *pgEventSource) Run() {
        defer logger(nil).Debug("pgEventSource Run finished")
 
        ps.setupOnce.Do(ps.setup)
+       ready := ps.ready
+       defer func() {
+               if ready != nil {
+                       close(ready)
+               }
+       }()
 
        ctx, cancel := context.WithCancel(context.Background())
        ps.cancel = cancel
@@ -101,11 +113,15 @@ func (ps *pgEventSource) Run() {
 
        db, err := sql.Open("postgres", ps.DataSource)
        if err != nil {
-               logger(nil).WithError(err).Fatal("sql.Open failed")
+               logger(nil).WithError(err).Error("sql.Open failed")
                return
        }
+       if ps.MaxOpenConns <= 0 {
+               logger(nil).Warn("no database connection limit configured -- consider setting PostgresPool>0 in arvados-ws configuration file")
+       }
+       db.SetMaxOpenConns(ps.MaxOpenConns)
        if err = db.Ping(); err != nil {
-               logger(nil).WithError(err).Fatal("db.Ping failed")
+               logger(nil).WithError(err).Error("db.Ping failed")
                return
        }
        ps.db = db
@@ -113,12 +129,15 @@ func (ps *pgEventSource) Run() {
        ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
        err = ps.pqListener.Listen("logs")
        if err != nil {
-               logger(nil).WithError(err).Fatal("pq Listen failed")
+               logger(nil).WithError(err).Error("pq Listen failed")
+               return
        }
        defer ps.pqListener.Close()
        logger(nil).Debug("pq Listen setup done")
 
-       close(ps.ready)
+       close(ready)
+       // Avoid double-close in deferred func
+       ready = nil
 
        ps.queue = make(chan *event, ps.QueueSize)
        defer close(ps.queue)
@@ -223,6 +242,12 @@ func (ps *pgEventSource) DB() *sql.DB {
        return ps.db
 }
 
+func (ps *pgEventSource) DBHealth() error {
+       ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       var i int
+       return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
+}
+
 func (ps *pgEventSource) DebugStatus() interface{} {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()