Merge branch '19973-dispatch-throttle' into main
[arvados.git] / services / ws / event_source_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "database/sql"
9         "fmt"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "git.arvados.org/arvados.git/sdk/go/ctxlog"
15         "github.com/prometheus/client_golang/prometheus"
16         check "gopkg.in/check.v1"
17 )
18
19 var _ = check.Suite(&eventSourceSuite{})
20
21 type eventSourceSuite struct{}
22
23 func testDBConfig() arvados.PostgreSQLConnection {
24         cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
25         if err != nil {
26                 panic(err)
27         }
28         cc, err := cfg.GetCluster("zzzzz")
29         if err != nil {
30                 panic(err)
31         }
32         return cc.PostgreSQL.Connection
33 }
34
35 func testDB() *sql.DB {
36         db, err := sql.Open("postgres", testDBConfig().String())
37         if err != nil {
38                 panic(err)
39         }
40         return db
41 }
42
43 func (*eventSourceSuite) TestEventSource(c *check.C) {
44         cfg := testDBConfig()
45         db := testDB()
46         pges := &pgEventSource{
47                 DataSource: cfg.String(),
48                 QueueSize:  4,
49                 Logger:     ctxlog.TestLogger(c),
50                 Reg:        prometheus.NewRegistry(),
51         }
52         go pges.Run()
53         sinks := make([]eventSink, 18)
54         for i := range sinks {
55                 sinks[i] = pges.NewSink()
56         }
57
58         pges.WaitReady()
59         defer pges.cancel()
60
61         done := make(chan bool, 1)
62
63         go func() {
64                 for i := range sinks {
65                         _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
66                         if err != nil {
67                                 done <- true
68                                 c.Fatal(err)
69                                 return
70                         }
71                 }
72         }()
73
74         var wg sync.WaitGroup
75         wg.Add(len(sinks))
76         for si, s := range sinks {
77                 go func(si int, s eventSink) {
78                         defer wg.Done()
79                         defer sinks[si].Stop()
80                         for i := 0; i <= si; i++ {
81                                 ev := <-sinks[si].Channel()
82                                 c.Logf("sink %d received event %d", si, i)
83                                 c.Check(ev.LogID, check.Equals, uint64(i))
84                                 row := ev.Detail()
85                                 if i == 0 {
86                                         // no matching row, null event
87                                         c.Check(row, check.IsNil)
88                                 } else {
89                                         c.Check(row, check.NotNil)
90                                         c.Check(row.ID, check.Equals, uint64(i))
91                                         c.Check(row.UUID, check.Not(check.Equals), "")
92                                 }
93                         }
94                 }(si, s)
95         }
96         go func() {
97                 wg.Wait()
98                 done <- true
99         }()
100
101         select {
102         case <-done:
103         case <-time.After(10 * time.Second):
104                 c.Fatal("timed out")
105         }
106
107         c.Check(pges.DBHealth(), check.IsNil)
108 }