-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ws
import (
"database/sql"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
type eventSourceSuite struct{}
-func testDBConfig() (pgConfig, error) {
- var railsDB struct {
- Test struct {
- Database string
- Username string
- Password string
- Host string
- }
- }
- err := config.LoadFile(&railsDB, "../api/config/database.yml")
+func testDBConfig() arvados.PostgreSQLConnection {
+ cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
if err != nil {
- return nil, err
+ panic(err)
}
- cfg := pgConfig{
- "dbname": railsDB.Test.Database,
- "host": railsDB.Test.Host,
- "password": railsDB.Test.Password,
- "user": railsDB.Test.Username,
+ cc, err := cfg.GetCluster("zzzzz")
+ if err != nil {
+ panic(err)
}
- return cfg, nil
+ return cc.PostgreSQL.Connection
}
-func testDB() (*sql.DB, error) {
- cfg, err := testDBConfig()
+func testDB() *sql.DB {
+ db, err := sql.Open("postgres", testDBConfig().String())
if err != nil {
- return nil, err
+ panic(err)
}
- return sql.Open("postgres", cfg.ConnectionString())
+ return db
}
func (*eventSourceSuite) TestEventSource(c *check.C) {
- cfg, err := testDBConfig()
- if err != nil {
- c.Fatal(err)
- }
+ cfg := testDBConfig()
+ db := testDB()
pges := &pgEventSource{
- DataSource: cfg.ConnectionString(),
+ DataSource: cfg.String(),
QueueSize: 4,
+ Logger: ctxlog.TestLogger(c),
+ Reg: prometheus.NewRegistry(),
}
go pges.Run()
sinks := make([]eventSink, 18)
sinks[i] = pges.NewSink()
}
- // wait for listener to start, as evidenced by queue channel
- // appearing (relying on internal implementation detail here)
- for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) {
- c.Assert(time.Now().After(deadline), check.Equals, false)
- }
+ pges.WaitReady()
defer pges.cancel()
- db, err := testDB()
- c.Assert(err, check.IsNil)
-
done := make(chan bool, 1)
go func() {
for i := 0; i <= si; i++ {
ev := <-sinks[si].Channel()
c.Logf("sink %d received event %d", si, i)
- c.Check(ev.LogID, check.Equals, uint64(i))
+ c.Check(ev.LogID, check.Equals, int64(i))
row := ev.Detail()
if i == 0 {
// no matching row, null event
c.Check(row, check.IsNil)
} else {
c.Check(row, check.NotNil)
- c.Check(row.ID, check.Equals, uint64(i))
+ c.Check(row.ID, check.Equals, int64(i))
c.Check(row.UUID, check.Not(check.Equals), "")
}
}
case <-time.After(10 * time.Second):
c.Fatal("timed out")
}
+
+ c.Check(pges.DBHealth(), check.IsNil)
}