Merge branch '14920-unknown-booting-race'
[arvados.git] / services / ws / event_source_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "database/sql"
9         "fmt"
10         "os"
11         "path/filepath"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         check "gopkg.in/check.v1"
17 )
18
19 var _ = check.Suite(&eventSourceSuite{})
20
21 type eventSourceSuite struct{}
22
23 func testDBConfig() arvados.PostgreSQLConnection {
24         cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
25         if err != nil {
26                 panic(err)
27         }
28         cc, err := cfg.GetCluster("zzzzz")
29         if err != nil {
30                 panic(err)
31         }
32         return cc.PostgreSQL.Connection
33 }
34
35 func testDB() *sql.DB {
36         db, err := sql.Open("postgres", testDBConfig().String())
37         if err != nil {
38                 panic(err)
39         }
40         return db
41 }
42
43 func (*eventSourceSuite) TestEventSource(c *check.C) {
44         cfg := testDBConfig()
45         db := testDB()
46         pges := &pgEventSource{
47                 DataSource: cfg.String(),
48                 QueueSize:  4,
49         }
50         go pges.Run()
51         sinks := make([]eventSink, 18)
52         for i := range sinks {
53                 sinks[i] = pges.NewSink()
54         }
55
56         pges.WaitReady()
57         defer pges.cancel()
58
59         done := make(chan bool, 1)
60
61         go func() {
62                 for i := range sinks {
63                         _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
64                         if err != nil {
65                                 done <- true
66                                 c.Fatal(err)
67                                 return
68                         }
69                 }
70         }()
71
72         var wg sync.WaitGroup
73         wg.Add(len(sinks))
74         for si, s := range sinks {
75                 go func(si int, s eventSink) {
76                         defer wg.Done()
77                         defer sinks[si].Stop()
78                         for i := 0; i <= si; i++ {
79                                 ev := <-sinks[si].Channel()
80                                 c.Logf("sink %d received event %d", si, i)
81                                 c.Check(ev.LogID, check.Equals, uint64(i))
82                                 row := ev.Detail()
83                                 if i == 0 {
84                                         // no matching row, null event
85                                         c.Check(row, check.IsNil)
86                                 } else {
87                                         c.Check(row, check.NotNil)
88                                         c.Check(row.ID, check.Equals, uint64(i))
89                                         c.Check(row.UUID, check.Not(check.Equals), "")
90                                 }
91                         }
92                 }(si, s)
93         }
94         go func() {
95                 wg.Wait()
96                 done <- true
97         }()
98
99         select {
100         case <-done:
101         case <-time.After(10 * time.Second):
102                 c.Fatal("timed out")
103         }
104
105         c.Check(pges.DBHealth(), check.IsNil)
106 }