1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/ctxlog"
17 "github.com/ghodss/yaml"
18 "github.com/prometheus/client_golang/prometheus"
19 check "gopkg.in/check.v1"
22 var _ = check.Suite(&eventSourceSuite{})
24 type eventSourceSuite struct{}
26 func testDBConfig() arvados.PostgreSQLConnection {
27 cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
31 cc, err := cfg.GetCluster("zzzzz")
35 return cc.PostgreSQL.Connection
38 func testDB() *sql.DB {
39 db, err := sql.Open("postgres", testDBConfig().String())
46 func (*eventSourceSuite) TestEventSource(c *check.C) {
47 var logfixtures map[string]struct {
50 yamldata, err := ioutil.ReadFile("../api/test/fixtures/logs.yml")
51 c.Assert(err, check.IsNil)
52 c.Assert(yaml.Unmarshal(yamldata, &logfixtures), check.IsNil)
54 for _, logfixture := range logfixtures {
55 logIDs = append(logIDs, logfixture.ID)
61 pges := &pgEventSource{
62 DataSource: cfg.String(),
64 Logger: ctxlog.TestLogger(c),
65 Reg: prometheus.NewRegistry(),
68 sinks := make([]eventSink, 18)
69 for i := range sinks {
70 sinks[i] = pges.NewSink()
76 done := make(chan bool, 1)
79 for _, id := range logIDs {
80 _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, id))
91 for si, s := range sinks {
92 go func(si int, s eventSink) {
94 defer sinks[si].Stop()
95 for _, logID := range logIDs {
96 ev := <-sinks[si].Channel()
97 c.Logf("sink %d received event %d", si, logID)
98 c.Check(ev.LogID, check.Equals, int64(logID))
100 if c.Check(row, check.NotNil) {
101 c.Check(row.ID, check.Equals, int64(logID))
102 c.Check(row.UUID, check.Not(check.Equals), "")
114 case <-time.After(10 * time.Second):
118 c.Check(pges.DBHealth(), check.IsNil)