1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.curoverse.com/arvados.git/sdk/go/config"
14 check "gopkg.in/check.v1"
17 var _ = check.Suite(&eventSourceSuite{})
19 type eventSourceSuite struct{}
21 func testDBConfig() pgConfig {
30 err := config.LoadFile(&railsDB, "../api/config/database.yml")
35 "dbname": railsDB.Test.Database,
36 "host": railsDB.Test.Host,
37 "password": railsDB.Test.Password,
38 "user": railsDB.Test.Username,
43 func testDB() *sql.DB {
44 db, err := sql.Open("postgres", testDBConfig().ConnectionString())
51 func (*eventSourceSuite) TestEventSource(c *check.C) {
54 pges := &pgEventSource{
55 DataSource: cfg.ConnectionString(),
59 sinks := make([]eventSink, 18)
60 for i := range sinks {
61 sinks[i] = pges.NewSink()
67 done := make(chan bool, 1)
70 for i := range sinks {
71 _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
82 for si, s := range sinks {
83 go func(si int, s eventSink) {
85 defer sinks[si].Stop()
86 for i := 0; i <= si; i++ {
87 ev := <-sinks[si].Channel()
88 c.Logf("sink %d received event %d", si, i)
89 c.Check(ev.LogID, check.Equals, uint64(i))
92 // no matching row, null event
93 c.Check(row, check.IsNil)
95 c.Check(row, check.NotNil)
96 c.Check(row.ID, check.Equals, uint64(i))
97 c.Check(row.UUID, check.Not(check.Equals), "")
109 case <-time.After(10 * time.Second):
113 c.Check(pges.DBHealth(), check.IsNil)