X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b9b4502bcddeccd794614bf6979d643f9f350877..f672f727fe79bf6642a2daab641a1ef5c84648df:/services/ws/event_source.go diff --git a/services/ws/event_source.go b/services/ws/event_source.go index 7c1b58492d..309dab7a40 100644 --- a/services/ws/event_source.go +++ b/services/ws/event_source.go @@ -1,10 +1,13 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "context" "database/sql" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -13,21 +16,6 @@ import ( "github.com/lib/pq" ) -type pgConfig map[string]string - -func (c pgConfig) ConnectionString() string { - s := "" - for k, v := range c { - s += k - s += "='" - s += strings.Replace( - strings.Replace(v, `\`, `\\`, -1), - `'`, `\'`, -1) - s += "' " - } - return s -} - type pgEventSource struct { DataSource string MaxOpenConns int @@ -239,9 +227,17 @@ func (ps *pgEventSource) NewSink() eventSink { } func (ps *pgEventSource) DB() *sql.DB { + ps.WaitReady() return ps.db } +func (ps *pgEventSource) DBHealth() error { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + var i int + return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i) +} + func (ps *pgEventSource) DebugStatus() interface{} { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -257,6 +253,7 @@ func (ps *pgEventSource) DebugStatus() interface{} { "QueueDelay": stats.Duration(ps.lastQDelay), "Sinks": len(ps.sinks), "SinksBlocked": blocked, + "DBStats": ps.db.Stats(), } } @@ -275,7 +272,7 @@ func (sink *pgEventSink) Stop() { // Ensure this sink cannot fill up and block the // server-side queue (which otherwise could in turn // block our mtx.Lock() here) - for _ = range sink.channel { + for range sink.channel { } }() sink.source.mtx.Lock()