Merge branch '21762-flaky-search-test'
[arvados.git] / services / ws / event_source_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "database/sql"
9         "fmt"
10         "io/ioutil"
11         "sort"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/ctxlog"
17         "github.com/ghodss/yaml"
18         "github.com/prometheus/client_golang/prometheus"
19         check "gopkg.in/check.v1"
20 )
21
22 var _ = check.Suite(&eventSourceSuite{})
23
24 type eventSourceSuite struct{}
25
26 func testDBConfig() arvados.PostgreSQLConnection {
27         cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
28         if err != nil {
29                 panic(err)
30         }
31         cc, err := cfg.GetCluster("zzzzz")
32         if err != nil {
33                 panic(err)
34         }
35         return cc.PostgreSQL.Connection
36 }
37
38 func testDB() *sql.DB {
39         db, err := sql.Open("postgres", testDBConfig().String())
40         if err != nil {
41                 panic(err)
42         }
43         return db
44 }
45
46 func (*eventSourceSuite) TestEventSource(c *check.C) {
47         var logfixtures map[string]struct {
48                 ID int
49         }
50         yamldata, err := ioutil.ReadFile("../api/test/fixtures/logs.yml")
51         c.Assert(err, check.IsNil)
52         c.Assert(yaml.Unmarshal(yamldata, &logfixtures), check.IsNil)
53         var logIDs []int
54         for _, logfixture := range logfixtures {
55                 logIDs = append(logIDs, logfixture.ID)
56         }
57         sort.Ints(logIDs)
58
59         cfg := testDBConfig()
60         db := testDB()
61         pges := &pgEventSource{
62                 DataSource: cfg.String(),
63                 QueueSize:  4,
64                 Logger:     ctxlog.TestLogger(c),
65                 Reg:        prometheus.NewRegistry(),
66         }
67         go pges.Run()
68         sinks := make([]eventSink, 18)
69         for i := range sinks {
70                 sinks[i] = pges.NewSink()
71         }
72
73         pges.WaitReady()
74         defer pges.cancel()
75
76         done := make(chan bool, 1)
77
78         go func() {
79                 for _, id := range logIDs {
80                         _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, id))
81                         if err != nil {
82                                 done <- true
83                                 c.Fatal(err)
84                                 return
85                         }
86                 }
87         }()
88
89         var wg sync.WaitGroup
90         wg.Add(len(sinks))
91         for si, s := range sinks {
92                 go func(si int, s eventSink) {
93                         defer wg.Done()
94                         defer sinks[si].Stop()
95                         for _, logID := range logIDs {
96                                 ev := <-sinks[si].Channel()
97                                 c.Logf("sink %d received event %d", si, logID)
98                                 c.Check(ev.LogID, check.Equals, int64(logID))
99                                 row := ev.Detail()
100                                 if c.Check(row, check.NotNil) {
101                                         c.Check(row.ID, check.Equals, int64(logID))
102                                         c.Check(row.UUID, check.Not(check.Equals), "")
103                                 }
104                         }
105                 }(si, s)
106         }
107         go func() {
108                 wg.Wait()
109                 done <- true
110         }()
111
112         select {
113         case <-done:
114         case <-time.After(10 * time.Second):
115                 c.Fatal("timed out")
116         }
117
118         c.Check(pges.DBHealth(), check.IsNil)
119 }