11168: Remove unused import.
[arvados.git] / services / ws / event_source_test.go
1 package main
2
3 import (
4         "database/sql"
5         "fmt"
6         "sync"
7         "time"
8
9         "git.curoverse.com/arvados.git/sdk/go/config"
10         check "gopkg.in/check.v1"
11 )
12
13 var _ = check.Suite(&eventSourceSuite{})
14
15 type eventSourceSuite struct{}
16
17 func testDBConfig() pgConfig {
18         var railsDB struct {
19                 Test struct {
20                         Database string
21                         Username string
22                         Password string
23                         Host     string
24                 }
25         }
26         err := config.LoadFile(&railsDB, "../api/config/database.yml")
27         if err != nil {
28                 panic(err)
29         }
30         cfg := pgConfig{
31                 "dbname":   railsDB.Test.Database,
32                 "host":     railsDB.Test.Host,
33                 "password": railsDB.Test.Password,
34                 "user":     railsDB.Test.Username,
35         }
36         return cfg
37 }
38
39 func testDB() *sql.DB {
40         db, err := sql.Open("postgres", testDBConfig().ConnectionString())
41         if err != nil {
42                 panic(err)
43         }
44         return db
45 }
46
47 func (*eventSourceSuite) TestEventSource(c *check.C) {
48         cfg := testDBConfig()
49         db := testDB()
50         pges := &pgEventSource{
51                 DataSource: cfg.ConnectionString(),
52                 QueueSize:  4,
53         }
54         go pges.Run()
55         sinks := make([]eventSink, 18)
56         for i := range sinks {
57                 sinks[i] = pges.NewSink()
58         }
59
60         pges.WaitReady()
61         defer pges.cancel()
62
63         done := make(chan bool, 1)
64
65         go func() {
66                 for i := range sinks {
67                         _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
68                         if err != nil {
69                                 done <- true
70                                 c.Fatal(err)
71                                 return
72                         }
73                 }
74         }()
75
76         var wg sync.WaitGroup
77         wg.Add(len(sinks))
78         for si, s := range sinks {
79                 go func(si int, s eventSink) {
80                         defer wg.Done()
81                         defer sinks[si].Stop()
82                         for i := 0; i <= si; i++ {
83                                 ev := <-sinks[si].Channel()
84                                 c.Logf("sink %d received event %d", si, i)
85                                 c.Check(ev.LogID, check.Equals, uint64(i))
86                                 row := ev.Detail()
87                                 if i == 0 {
88                                         // no matching row, null event
89                                         c.Check(row, check.IsNil)
90                                 } else {
91                                         c.Check(row, check.NotNil)
92                                         c.Check(row.ID, check.Equals, uint64(i))
93                                         c.Check(row.UUID, check.Not(check.Equals), "")
94                                 }
95                         }
96                 }(si, s)
97         }
98         go func() {
99                 wg.Wait()
100                 done <- true
101         }()
102
103         select {
104         case <-done:
105         case <-time.After(10 * time.Second):
106                 c.Fatal("timed out")
107         }
108 }