"context"
"database/sql"
"strconv"
- "strings"
"sync"
"sync/atomic"
"time"
"github.com/lib/pq"
)
-type pgConfig map[string]string
-
-func (c pgConfig) ConnectionString() string {
- s := ""
- for k, v := range c {
- s += k
- s += "='"
- s += strings.Replace(
- strings.Replace(v, `\`, `\\`, -1),
- `'`, `\'`, -1)
- s += "' "
- }
- return s
-}
-
type pgEventSource struct {
DataSource string
MaxOpenConns int
}
func (ps *pgEventSource) DBHealth() error {
- ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ defer cancel()
var i int
return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
}
// Ensure this sink cannot fill up and block the
// server-side queue (which otherwise could in turn
// block our mtx.Lock() here)
- for _ = range sink.channel {
+ for range sink.channel {
}
}()
sink.source.mtx.Lock()