16217: Add prometheus metrics.
[arvados.git] / services / ws / event_source_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "database/sql"
9         "fmt"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "git.arvados.org/arvados.git/sdk/go/ctxlog"
15         "github.com/prometheus/client_golang/prometheus"
16         check "gopkg.in/check.v1"
17 )
18
19 var _ debugStatuser = (*pgEventSource)(nil)
20
21 var _ = check.Suite(&eventSourceSuite{})
22
23 type eventSourceSuite struct{}
24
25 func testDBConfig() arvados.PostgreSQLConnection {
26         cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
27         if err != nil {
28                 panic(err)
29         }
30         cc, err := cfg.GetCluster("zzzzz")
31         if err != nil {
32                 panic(err)
33         }
34         return cc.PostgreSQL.Connection
35 }
36
37 func testDB() *sql.DB {
38         db, err := sql.Open("postgres", testDBConfig().String())
39         if err != nil {
40                 panic(err)
41         }
42         return db
43 }
44
45 func (*eventSourceSuite) TestEventSource(c *check.C) {
46         cfg := testDBConfig()
47         db := testDB()
48         pges := &pgEventSource{
49                 DataSource: cfg.String(),
50                 QueueSize:  4,
51                 Logger:     ctxlog.TestLogger(c),
52                 Reg:        prometheus.NewRegistry(),
53         }
54         go pges.Run()
55         sinks := make([]eventSink, 18)
56         for i := range sinks {
57                 sinks[i] = pges.NewSink()
58         }
59
60         pges.WaitReady()
61         defer pges.cancel()
62
63         done := make(chan bool, 1)
64
65         go func() {
66                 for i := range sinks {
67                         _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
68                         if err != nil {
69                                 done <- true
70                                 c.Fatal(err)
71                                 return
72                         }
73                 }
74         }()
75
76         var wg sync.WaitGroup
77         wg.Add(len(sinks))
78         for si, s := range sinks {
79                 go func(si int, s eventSink) {
80                         defer wg.Done()
81                         defer sinks[si].Stop()
82                         for i := 0; i <= si; i++ {
83                                 ev := <-sinks[si].Channel()
84                                 c.Logf("sink %d received event %d", si, i)
85                                 c.Check(ev.LogID, check.Equals, uint64(i))
86                                 row := ev.Detail()
87                                 if i == 0 {
88                                         // no matching row, null event
89                                         c.Check(row, check.IsNil)
90                                 } else {
91                                         c.Check(row, check.NotNil)
92                                         c.Check(row.ID, check.Equals, uint64(i))
93                                         c.Check(row.UUID, check.Not(check.Equals), "")
94                                 }
95                         }
96                 }(si, s)
97         }
98         go func() {
99                 wg.Wait()
100                 done <- true
101         }()
102
103         select {
104         case <-done:
105         case <-time.After(10 * time.Second):
106                 c.Fatal("timed out")
107         }
108
109         c.Check(pges.DBHealth(), check.IsNil)
110 }