10764: Permission tests. Support PDH permission check.
[arvados.git] / services / ws / event_source_test.go
1 package main
2
3 import (
4         "database/sql"
5         "fmt"
6         "sync"
7         "time"
8
9         "git.curoverse.com/arvados.git/sdk/go/config"
10         check "gopkg.in/check.v1"
11 )
12
13 var _ = check.Suite(&eventSourceSuite{})
14
15 type eventSourceSuite struct{}
16
17 func testDBConfig() (pgConfig, error) {
18         var railsDB struct {
19                 Test struct {
20                         Database string
21                         Username string
22                         Password string
23                         Host     string
24                 }
25         }
26         err := config.LoadFile(&railsDB, "../api/config/database.yml")
27         if err != nil {
28                 return nil, err
29         }
30         cfg := pgConfig{
31                 "dbname":   railsDB.Test.Database,
32                 "host":     railsDB.Test.Host,
33                 "password": railsDB.Test.Password,
34                 "user":     railsDB.Test.Username,
35         }
36         return cfg, nil
37 }
38
39 func testDB() (*sql.DB, error) {
40         cfg, err := testDBConfig()
41         if err != nil {
42                 return nil, err
43         }
44         return sql.Open("postgres", cfg.ConnectionString())
45 }
46
47 func (*eventSourceSuite) TestEventSource(c *check.C) {
48         cfg, err := testDBConfig()
49         if err != nil {
50                 c.Fatal(err)
51         }
52         pges := &pgEventSource{
53                 DataSource: cfg.ConnectionString(),
54                 QueueSize:  4,
55         }
56         go pges.Run()
57         sinks := make([]eventSink, 18)
58         for i := range sinks {
59                 sinks[i] = pges.NewSink()
60         }
61
62         // wait for listener to start, as evidenced by queue channel
63         // appearing (relying on internal implementation detail here)
64         for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) {
65                 c.Assert(time.Now().After(deadline), check.Equals, false)
66         }
67         defer pges.cancel()
68
69         db, err := testDB()
70         c.Assert(err, check.IsNil)
71
72         done := make(chan bool, 1)
73
74         go func() {
75                 for i := range sinks {
76                         _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
77                         if err != nil {
78                                 done <- true
79                                 c.Fatal(err)
80                                 return
81                         }
82                 }
83         }()
84
85         var wg sync.WaitGroup
86         wg.Add(len(sinks))
87         for si, s := range sinks {
88                 go func(si int, s eventSink) {
89                         defer wg.Done()
90                         defer sinks[si].Stop()
91                         for i := 0; i <= si; i++ {
92                                 ev := <-sinks[si].Channel()
93                                 c.Logf("sink %d received event %d", si, i)
94                                 c.Check(ev.LogID, check.Equals, uint64(i))
95                                 row := ev.Detail()
96                                 if i == 0 {
97                                         // no matching row, null event
98                                         c.Check(row, check.IsNil)
99                                 } else {
100                                         c.Check(row, check.NotNil)
101                                         c.Check(row.ID, check.Equals, uint64(i))
102                                         c.Check(row.UUID, check.Not(check.Equals), "")
103                                 }
104                         }
105                 }(si, s)
106         }
107         go func() {
108                 wg.Wait()
109                 done <- true
110         }()
111
112         select {
113         case <-done:
114         case <-time.After(10 * time.Second):
115                 c.Fatal("timed out")
116         }
117 }