10764: Simplify test server shutdown.
[arvados.git] / services / ws / session_v0_test.go
1 package main
2
3 import (
4         "bytes"
5         "encoding/json"
6         "fmt"
7         "io"
8         "net"
9         "net/http"
10         "net/url"
11         "os"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
16         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
17         "golang.org/x/net/websocket"
18         check "gopkg.in/check.v1"
19 )
20
21 func init() {
22         if os.Getenv("ARVADOS_DEBUG") != "" {
23                 ctxlog.SetLevel("debug")
24         }
25 }
26
27 var _ = check.Suite(&v0Suite{})
28
29 type v0Suite struct {
30         token    string
31         toDelete []string
32 }
33
34 func (s *v0Suite) SetUpTest(c *check.C) {
35         s.token = arvadostest.ActiveToken
36 }
37
38 func (s *v0Suite) TearDownSuite(c *check.C) {
39         ac := arvados.NewClientFromEnv()
40         ac.AuthToken = arvadostest.AdminToken
41         for _, path := range s.toDelete {
42                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
43                 if err != nil {
44                         panic(err)
45                 }
46         }
47 }
48
49 func (s *v0Suite) TestFilters(c *check.C) {
50         srv, conn, r, w := s.testClient()
51         defer srv.Close()
52         defer conn.Close()
53
54         c.Check(w.Encode(map[string]interface{}{
55                 "method":  "subscribe",
56                 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
57         }), check.IsNil)
58         s.expectStatus(c, r, 200)
59
60         go s.emitEvents(nil)
61         lg := s.expectLog(c, r)
62         c.Check(lg.EventType, check.Equals, "update")
63 }
64
65 func (s *v0Suite) TestLastLogID(c *check.C) {
66         var lastID uint64
67         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
68
69         srv, conn, r, w := s.testClient()
70         defer srv.Close()
71         defer conn.Close()
72
73         uuidChan := make(chan string, 2)
74         s.emitEvents(uuidChan)
75
76         c.Check(w.Encode(map[string]interface{}{
77                 "method":      "subscribe",
78                 "last_log_id": lastID,
79         }), check.IsNil)
80         s.expectStatus(c, r, 200)
81
82         go func() {
83                 s.emitEvents(uuidChan)
84                 close(uuidChan)
85         }()
86
87         done := make(chan bool)
88         go func() {
89                 for uuid := range uuidChan {
90                         for _, etype := range []string{"create", "blip", "update"} {
91                                 lg := s.expectLog(c, r)
92                                 c.Check(lg.ObjectUUID, check.Equals, uuid)
93                                 c.Check(lg.EventType, check.Equals, etype)
94                         }
95                 }
96                 close(done)
97         }()
98
99         select {
100         case <-time.After(10 * time.Second):
101                 c.Fatal("timeout")
102         case <-done:
103         }
104 }
105
106 func (s *v0Suite) TestPermission(c *check.C) {
107         srv, conn, r, w := s.testClient()
108         defer srv.Close()
109         defer conn.Close()
110
111         c.Check(w.Encode(map[string]interface{}{
112                 "method": "subscribe",
113         }), check.IsNil)
114         s.expectStatus(c, r, 200)
115
116         uuidChan := make(chan string, 1)
117         go func() {
118                 s.token = arvadostest.AdminToken
119                 s.emitEvents(nil)
120                 s.token = arvadostest.ActiveToken
121                 s.emitEvents(uuidChan)
122         }()
123
124         lg := s.expectLog(c, r)
125         c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
126 }
127
128 func (s *v0Suite) TestSendBadJSON(c *check.C) {
129         srv, conn, r, w := s.testClient()
130         defer srv.Close()
131         defer conn.Close()
132
133         c.Check(w.Encode(map[string]interface{}{
134                 "method": "subscribe",
135         }), check.IsNil)
136         s.expectStatus(c, r, 200)
137
138         _, err := fmt.Fprint(conn, "^]beep\n")
139         c.Check(err, check.IsNil)
140         s.expectStatus(c, r, 400)
141
142         c.Check(w.Encode(map[string]interface{}{
143                 "method": "subscribe",
144         }), check.IsNil)
145         s.expectStatus(c, r, 200)
146 }
147
148 func (s *v0Suite) TestSubscribe(c *check.C) {
149         srv, conn, r, w := s.testClient()
150         defer srv.Close()
151         defer conn.Close()
152
153         s.emitEvents(nil)
154
155         err := w.Encode(map[string]interface{}{"21": 12})
156         c.Check(err, check.IsNil)
157         s.expectStatus(c, r, 400)
158
159         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
160         c.Check(err, check.IsNil)
161         s.expectStatus(c, r, 200)
162
163         uuidChan := make(chan string, 1)
164         go s.emitEvents(uuidChan)
165         uuid := <-uuidChan
166
167         for _, etype := range []string{"create", "blip", "update"} {
168                 lg := s.expectLog(c, r)
169                 c.Check(lg.ObjectUUID, check.Equals, uuid)
170                 c.Check(lg.EventType, check.Equals, etype)
171         }
172 }
173
174 // Generate some events by creating and updating a workflow object,
175 // and creating a custom log entry (event_type="blip") about the newly
176 // created workflow. If uuidChan is not nil, send the new workflow
177 // UUID to uuidChan as soon as it's known.
178 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
179         ac := arvados.NewClientFromEnv()
180         ac.AuthToken = s.token
181         wf := &arvados.Workflow{
182                 Name: "ws_test",
183         }
184         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
185         if err != nil {
186                 panic(err)
187         }
188         if uuidChan != nil {
189                 uuidChan <- wf.UUID
190         }
191         lg := &arvados.Log{}
192         err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
193                 ObjectUUID: wf.UUID,
194                 EventType:  "blip",
195                 Properties: map[string]interface{}{
196                         "beep": "boop",
197                 },
198         }), nil)
199         if err != nil {
200                 panic(err)
201         }
202         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
203         if err != nil {
204                 panic(err)
205         }
206         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
207 }
208
209 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
210         j, err := json.Marshal(ob)
211         if err != nil {
212                 panic(err)
213         }
214         v := url.Values{}
215         v[rscName] = []string{string(j)}
216         return bytes.NewBufferString(v.Encode())
217 }
218
219 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
220         msg := map[string]interface{}{}
221         c.Check(r.Decode(&msg), check.IsNil)
222         c.Check(int(msg["status"].(float64)), check.Equals, status)
223 }
224
225 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
226         lg := &arvados.Log{}
227         c.Check(r.Decode(lg), check.IsNil)
228         return lg
229 }
230
231 func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
232         srv := newTestServer()
233         conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
234         if err != nil {
235                 panic(err)
236         }
237         w := json.NewEncoder(conn)
238         r := json.NewDecoder(conn)
239         return srv, conn, r, w
240 }
241
242 type testServer struct {
243         http.Server
244         addr string
245         ln   net.Listener
246         pges *pgEventSource
247 }
248
249 func (srv *testServer) Close() {
250         srv.ln.Close()
251         srv.pges.cancel()
252 }
253
254 func newTestServer() *testServer {
255         ln, err := net.Listen("tcp", ":")
256         if err != nil {
257                 panic(err)
258         }
259         cfg := defaultConfig()
260         cfg.Client = *(arvados.NewClientFromEnv())
261         pges := &pgEventSource{
262                 DataSource: testDBConfig().ConnectionString(),
263                 QueueSize:  4,
264         }
265         srv := &testServer{
266                 Server: http.Server{
267                         Addr:         ":",
268                         ReadTimeout:  10 * time.Second,
269                         WriteTimeout: 10 * time.Second,
270                         Handler: &router{
271                                 Config:         &cfg,
272                                 eventSource:    pges,
273                                 newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
274                         },
275                 },
276                 addr: ln.Addr().String(),
277                 ln:   ln,
278                 pges: pges,
279         }
280         go pges.Run()
281         go srv.Serve(ln)
282         pges.waitReady()
283         return srv
284 }