package main import ( "database/sql" "fmt" "sync" "time" "git.curoverse.com/arvados.git/sdk/go/config" check "gopkg.in/check.v1" ) var _ = check.Suite(&eventSourceSuite{}) type eventSourceSuite struct{} func testDBConfig() pgConfig { var railsDB struct { Test struct { Database string Username string Password string Host string } } err := config.LoadFile(&railsDB, "../api/config/database.yml") if err != nil { panic(err) } cfg := pgConfig{ "dbname": railsDB.Test.Database, "host": railsDB.Test.Host, "password": railsDB.Test.Password, "user": railsDB.Test.Username, } return cfg } func testDB() *sql.DB { db, err := sql.Open("postgres", testDBConfig().ConnectionString()) if err != nil { panic(err) } return db } func (*eventSourceSuite) TestEventSource(c *check.C) { cfg := testDBConfig() db := testDB() pges := &pgEventSource{ DataSource: cfg.ConnectionString(), QueueSize: 4, } go pges.Run() sinks := make([]eventSink, 18) for i := range sinks { sinks[i] = pges.NewSink() } pges.WaitReady() defer pges.cancel() done := make(chan bool, 1) go func() { for i := range sinks { _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i)) if err != nil { done <- true c.Fatal(err) return } } }() var wg sync.WaitGroup wg.Add(len(sinks)) for si, s := range sinks { go func(si int, s eventSink) { defer wg.Done() defer sinks[si].Stop() for i := 0; i <= si; i++ { ev := <-sinks[si].Channel() c.Logf("sink %d received event %d", si, i) c.Check(ev.LogID, check.Equals, uint64(i)) row := ev.Detail() if i == 0 { // no matching row, null event c.Check(row, check.IsNil) } else { c.Check(row, check.NotNil) c.Check(row.ID, check.Equals, uint64(i)) c.Check(row.UUID, check.Not(check.Equals), "") } } }(si, s) } go func() { wg.Wait() done <- true }() select { case <-done: case <-time.After(10 * time.Second): c.Fatal("timed out") } c.Check(pges.DBHealth(), check.IsNil) }