9 "git.curoverse.com/arvados.git/sdk/go/config"
10 check "gopkg.in/check.v1"
13 var _ = check.Suite(&eventSourceSuite{})
15 type eventSourceSuite struct{}
17 func testDBConfig() (pgConfig, error) {
26 err := config.LoadFile(&railsDB, "../api/config/database.yml")
31 "dbname": railsDB.Test.Database,
32 "host": railsDB.Test.Host,
33 "password": railsDB.Test.Password,
34 "user": railsDB.Test.Username,
39 func testDB() (*sql.DB, error) {
40 cfg, err := testDBConfig()
44 return sql.Open("postgres", cfg.ConnectionString())
47 func (*eventSourceSuite) TestEventSource(c *check.C) {
48 cfg, err := testDBConfig()
52 pges := &pgEventSource{
53 DataSource: cfg.ConnectionString(),
57 sinks := make([]eventSink, 18)
58 for i := range sinks {
59 sinks[i] = pges.NewSink()
62 // wait for listener to start, as evidenced by queue channel
63 // appearing (relying on internal implementation detail here)
64 for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) {
65 c.Assert(time.Now().After(deadline), check.Equals, false)
70 c.Assert(err, check.IsNil)
72 done := make(chan bool, 1)
75 for i := range sinks {
76 _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
87 for si, s := range sinks {
88 go func(si int, s eventSink) {
90 defer sinks[si].Stop()
91 for i := 0; i <= si; i++ {
92 ev := <-sinks[si].Channel()
93 c.Logf("sink %d received event %d", si, i)
94 c.Check(ev.LogID, check.Equals, uint64(i))
97 // no matching row, null event
98 c.Check(row, check.IsNil)
100 c.Check(row, check.NotNil)
101 c.Check(row.ID, check.Equals, uint64(i))
102 c.Check(row.UUID, check.Not(check.Equals), "")
114 case <-time.After(10 * time.Second):