1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/sdk/go/arvados"
14 "git.arvados.org/arvados.git/sdk/go/ctxlog"
15 "github.com/prometheus/client_golang/prometheus"
16 check "gopkg.in/check.v1"
19 var _ = check.Suite(&eventSourceSuite{})
21 type eventSourceSuite struct{}
23 func testDBConfig() arvados.PostgreSQLConnection {
24 cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
28 cc, err := cfg.GetCluster("zzzzz")
32 return cc.PostgreSQL.Connection
35 func testDB() *sql.DB {
36 db, err := sql.Open("postgres", testDBConfig().String())
43 func (*eventSourceSuite) TestEventSource(c *check.C) {
46 pges := &pgEventSource{
47 DataSource: cfg.String(),
49 Logger: ctxlog.TestLogger(c),
50 Reg: prometheus.NewRegistry(),
53 sinks := make([]eventSink, 18)
54 for i := range sinks {
55 sinks[i] = pges.NewSink()
61 done := make(chan bool, 1)
64 for i := range sinks {
65 _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
76 for si, s := range sinks {
77 go func(si int, s eventSink) {
79 defer sinks[si].Stop()
80 for i := 0; i <= si; i++ {
81 ev := <-sinks[si].Channel()
82 c.Logf("sink %d received event %d", si, i)
83 c.Check(ev.LogID, check.Equals, int64(i))
86 // no matching row, null event
87 c.Check(row, check.IsNil)
89 c.Check(row, check.NotNil)
90 c.Check(row.ID, check.Equals, int64(i))
91 c.Check(row.UUID, check.Not(check.Equals), "")
103 case <-time.After(10 * time.Second):
107 c.Check(pges.DBHealth(), check.IsNil)