Merge branch '8784-dir-listings'
[arvados.git] / services / ws / event_source_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "database/sql"
9         "fmt"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/config"
14         check "gopkg.in/check.v1"
15 )
16
17 var _ = check.Suite(&eventSourceSuite{})
18
19 type eventSourceSuite struct{}
20
21 func testDBConfig() pgConfig {
22         var railsDB struct {
23                 Test struct {
24                         Database string
25                         Username string
26                         Password string
27                         Host     string
28                 }
29         }
30         err := config.LoadFile(&railsDB, "../api/config/database.yml")
31         if err != nil {
32                 panic(err)
33         }
34         cfg := pgConfig{
35                 "dbname":   railsDB.Test.Database,
36                 "host":     railsDB.Test.Host,
37                 "password": railsDB.Test.Password,
38                 "user":     railsDB.Test.Username,
39         }
40         return cfg
41 }
42
43 func testDB() *sql.DB {
44         db, err := sql.Open("postgres", testDBConfig().ConnectionString())
45         if err != nil {
46                 panic(err)
47         }
48         return db
49 }
50
51 func (*eventSourceSuite) TestEventSource(c *check.C) {
52         cfg := testDBConfig()
53         db := testDB()
54         pges := &pgEventSource{
55                 DataSource: cfg.ConnectionString(),
56                 QueueSize:  4,
57         }
58         go pges.Run()
59         sinks := make([]eventSink, 18)
60         for i := range sinks {
61                 sinks[i] = pges.NewSink()
62         }
63
64         pges.WaitReady()
65         defer pges.cancel()
66
67         done := make(chan bool, 1)
68
69         go func() {
70                 for i := range sinks {
71                         _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
72                         if err != nil {
73                                 done <- true
74                                 c.Fatal(err)
75                                 return
76                         }
77                 }
78         }()
79
80         var wg sync.WaitGroup
81         wg.Add(len(sinks))
82         for si, s := range sinks {
83                 go func(si int, s eventSink) {
84                         defer wg.Done()
85                         defer sinks[si].Stop()
86                         for i := 0; i <= si; i++ {
87                                 ev := <-sinks[si].Channel()
88                                 c.Logf("sink %d received event %d", si, i)
89                                 c.Check(ev.LogID, check.Equals, uint64(i))
90                                 row := ev.Detail()
91                                 if i == 0 {
92                                         // no matching row, null event
93                                         c.Check(row, check.IsNil)
94                                 } else {
95                                         c.Check(row, check.NotNil)
96                                         c.Check(row.ID, check.Equals, uint64(i))
97                                         c.Check(row.UUID, check.Not(check.Equals), "")
98                                 }
99                         }
100                 }(si, s)
101         }
102         go func() {
103                 wg.Wait()
104                 done <- true
105         }()
106
107         select {
108         case <-done:
109         case <-time.After(10 * time.Second):
110                 c.Fatal("timed out")
111         }
112
113         c.Check(pges.DBHealth(), check.IsNil)
114 }