-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ws
import (
"database/sql"
"fmt"
+ "io/ioutil"
+ "sort"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
type eventSourceSuite struct{}
-func testDBConfig() (pgConfig, error) {
- var railsDB struct {
- Test struct {
- Database string
- Username string
- Password string
- Host string
- }
- }
- err := config.LoadFile(&railsDB, "../api/config/database.yml")
+func testDBConfig() arvados.PostgreSQLConnection {
+ cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
if err != nil {
- return nil, err
+ panic(err)
}
- cfg := pgConfig{
- "dbname": railsDB.Test.Database,
- "host": railsDB.Test.Host,
- "password": railsDB.Test.Password,
- "user": railsDB.Test.Username,
+ cc, err := cfg.GetCluster("zzzzz")
+ if err != nil {
+ panic(err)
}
- return cfg, nil
+ return cc.PostgreSQL.Connection
}
-func testDB() (*sql.DB, error) {
- cfg, err := testDBConfig()
+func testDB() *sql.DB {
+ db, err := sql.Open("postgres", testDBConfig().String())
if err != nil {
- return nil, err
+ panic(err)
}
- return sql.Open("postgres", cfg.ConnectionString())
+ return db
}
func (*eventSourceSuite) TestEventSource(c *check.C) {
- cfg, err := testDBConfig()
- if err != nil {
- c.Fatal(err)
+ var logfixtures map[string]struct {
+ ID int
+ }
+ yamldata, err := ioutil.ReadFile("../api/test/fixtures/logs.yml")
+ c.Assert(err, check.IsNil)
+ c.Assert(yaml.Unmarshal(yamldata, &logfixtures), check.IsNil)
+ var logIDs []int
+ for _, logfixture := range logfixtures {
+ logIDs = append(logIDs, logfixture.ID)
}
+ sort.Ints(logIDs)
+
+ cfg := testDBConfig()
+ db := testDB()
pges := &pgEventSource{
- DataSource: cfg.ConnectionString(),
+ DataSource: cfg.String(),
QueueSize: 4,
+ Logger: ctxlog.TestLogger(c),
+ Reg: prometheus.NewRegistry(),
}
go pges.Run()
sinks := make([]eventSink, 18)
sinks[i] = pges.NewSink()
}
- // wait for listener to start, as evidenced by queue channel
- // appearing (relying on internal implementation detail here)
- for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) {
- c.Assert(time.Now().After(deadline), check.Equals, false)
- }
+ pges.WaitReady()
defer pges.cancel()
- db, err := testDB()
- c.Assert(err, check.IsNil)
-
done := make(chan bool, 1)
go func() {
- for i := range sinks {
- _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
+ for _, id := range logIDs {
+ _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, id))
if err != nil {
done <- true
c.Fatal(err)
go func(si int, s eventSink) {
defer wg.Done()
defer sinks[si].Stop()
- for i := 0; i <= si; i++ {
+ for _, logID := range logIDs {
ev := <-sinks[si].Channel()
- c.Logf("sink %d received event %d", si, i)
- c.Check(ev.LogID, check.Equals, uint64(i))
+ c.Logf("sink %d received event %d", si, logID)
+ c.Check(ev.LogID, check.Equals, int64(logID))
row := ev.Detail()
- if i == 0 {
- // no matching row, null event
- c.Check(row, check.IsNil)
- } else {
- c.Check(row, check.NotNil)
- c.Check(row.ID, check.Equals, uint64(i))
+ if c.Check(row, check.NotNil) {
+ c.Check(row.ID, check.Equals, int64(logID))
c.Check(row.UUID, check.Not(check.Equals), "")
}
}
case <-time.After(10 * time.Second):
c.Fatal("timed out")
}
+
+ c.Check(pges.DBHealth(), check.IsNil)
}