11960: Fix races in tests.
[arvados.git] / services / ws / session_v0_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "net/url"
13         "os"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
19         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
20         "golang.org/x/net/websocket"
21         check "gopkg.in/check.v1"
22 )
23
24 func init() {
25         if os.Getenv("ARVADOS_DEBUG") != "" {
26                 ctxlog.SetLevel("debug")
27         }
28 }
29
30 var _ = check.Suite(&v0Suite{})
31
32 type v0Suite struct {
33         serverSuite serverSuite
34         token       string
35         toDelete    []string
36         wg          sync.WaitGroup
37         ignoreLogID uint64
38 }
39
40 func (s *v0Suite) SetUpTest(c *check.C) {
41         s.serverSuite.SetUpTest(c)
42         go s.serverSuite.srv.Run()
43         s.serverSuite.srv.WaitReady()
44
45         s.token = arvadostest.ActiveToken
46         s.ignoreLogID = s.lastLogID(c)
47 }
48
49 func (s *v0Suite) TearDownTest(c *check.C) {
50         s.wg.Wait()
51         s.serverSuite.srv.Close()
52 }
53
54 func (s *v0Suite) TearDownSuite(c *check.C) {
55         s.deleteTestObjects(c)
56 }
57
58 func (s *v0Suite) deleteTestObjects(c *check.C) {
59         ac := arvados.NewClientFromEnv()
60         ac.AuthToken = arvadostest.AdminToken
61         for _, path := range s.toDelete {
62                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
63                 if err != nil {
64                         panic(err)
65                 }
66         }
67         s.toDelete = nil
68 }
69
70 func (s *v0Suite) TestFilters(c *check.C) {
71         conn, r, w := s.testClient()
72         defer conn.Close()
73
74         c.Check(w.Encode(map[string]interface{}{
75                 "method":  "subscribe",
76                 "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
77         }), check.IsNil)
78         s.expectStatus(c, r, 200)
79
80         go s.emitEvents(nil)
81         lg := s.expectLog(c, r)
82         c.Check(lg.EventType, check.Equals, "update")
83 }
84
85 func (s *v0Suite) TestLastLogID(c *check.C) {
86         lastID := s.lastLogID(c)
87
88         checkLogs := func(r *json.Decoder, uuid string) {
89                 for _, etype := range []string{"create", "blip", "update"} {
90                         lg := s.expectLog(c, r)
91                         for lg.ObjectUUID != uuid {
92                                 lg = s.expectLog(c, r)
93                         }
94                         c.Check(lg.EventType, check.Equals, etype)
95                 }
96         }
97
98         // Connecting connEarly (before sending the early events) lets
99         // us confirm all of the "early" events have already passed
100         // through the server.
101         connEarly, rEarly, wEarly := s.testClient()
102         defer connEarly.Close()
103         c.Check(wEarly.Encode(map[string]interface{}{
104                 "method": "subscribe",
105         }), check.IsNil)
106         s.expectStatus(c, rEarly, 200)
107
108         // Send the early events.
109         uuidChan := make(chan string, 1)
110         s.emitEvents(uuidChan)
111         uuidEarly := <-uuidChan
112
113         // Wait for the early events to pass through.
114         checkLogs(rEarly, uuidEarly)
115
116         // Connect the client that wants to get old events via
117         // last_log_id.
118         conn, r, w := s.testClient()
119         defer conn.Close()
120
121         c.Check(w.Encode(map[string]interface{}{
122                 "method":      "subscribe",
123                 "last_log_id": lastID,
124         }), check.IsNil)
125         s.expectStatus(c, r, 200)
126
127         checkLogs(r, uuidEarly)
128         s.emitEvents(uuidChan)
129         checkLogs(r, <-uuidChan)
130 }
131
132 func (s *v0Suite) TestPermission(c *check.C) {
133         conn, r, w := s.testClient()
134         defer conn.Close()
135
136         c.Check(w.Encode(map[string]interface{}{
137                 "method": "subscribe",
138         }), check.IsNil)
139         s.expectStatus(c, r, 200)
140
141         uuidChan := make(chan string, 2)
142         go func() {
143                 s.token = arvadostest.AdminToken
144                 s.emitEvents(uuidChan)
145                 s.token = arvadostest.ActiveToken
146                 s.emitEvents(uuidChan)
147         }()
148
149         wrongUUID := <-uuidChan
150         rightUUID := <-uuidChan
151         lg := s.expectLog(c, r)
152         for lg.ObjectUUID != rightUUID {
153                 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
154                 lg = s.expectLog(c, r)
155         }
156 }
157
158 // Two users create private objects; admin deletes both objects; each
159 // user receives a "delete" event for their own object (not for the
160 // other user's object).
161 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
162         clients := []struct {
163                 token string
164                 uuid  string
165                 conn  *websocket.Conn
166                 r     *json.Decoder
167                 w     *json.Encoder
168         }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
169         for i := range clients {
170                 uuidChan := make(chan string, 1)
171                 s.token = clients[i].token
172                 s.emitEvents(uuidChan)
173                 clients[i].uuid = <-uuidChan
174                 clients[i].conn, clients[i].r, clients[i].w = s.testClient()
175
176                 c.Check(clients[i].w.Encode(map[string]interface{}{
177                         "method": "subscribe",
178                 }), check.IsNil)
179                 s.expectStatus(c, clients[i].r, 200)
180         }
181
182         s.ignoreLogID = s.lastLogID(c)
183         s.deleteTestObjects(c)
184
185         for _, client := range clients {
186                 lg := s.expectLog(c, client.r)
187                 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
188                 c.Check(lg.EventType, check.Equals, "delete")
189         }
190 }
191
192 // Trashing/deleting a collection produces an "update" event with
193 // properties["new_attributes"]["is_trashed"] == true.
194 func (s *v0Suite) TestTrashedCollection(c *check.C) {
195         ac := arvados.NewClientFromEnv()
196         ac.AuthToken = s.token
197
198         coll := &arvados.Collection{ManifestText: ""}
199         err := ac.RequestAndDecode(coll, "POST", "arvados/v1/collections", s.jsonBody("collection", coll), map[string]interface{}{"ensure_unique_name": true})
200         c.Assert(err, check.IsNil)
201         s.ignoreLogID = s.lastLogID(c)
202
203         conn, r, w := s.testClient()
204         defer conn.Close()
205
206         c.Check(w.Encode(map[string]interface{}{
207                 "method": "subscribe",
208         }), check.IsNil)
209         s.expectStatus(c, r, 200)
210
211         err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
212         c.Assert(err, check.IsNil)
213
214         lg := s.expectLog(c, r)
215         c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
216         c.Check(lg.EventType, check.Equals, "update")
217         c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
218         c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
219 }
220
221 func (s *v0Suite) TestSendBadJSON(c *check.C) {
222         conn, r, w := s.testClient()
223         defer conn.Close()
224
225         c.Check(w.Encode(map[string]interface{}{
226                 "method": "subscribe",
227         }), check.IsNil)
228         s.expectStatus(c, r, 200)
229
230         _, err := fmt.Fprint(conn, "^]beep\n")
231         c.Check(err, check.IsNil)
232         s.expectStatus(c, r, 400)
233
234         c.Check(w.Encode(map[string]interface{}{
235                 "method": "subscribe",
236         }), check.IsNil)
237         s.expectStatus(c, r, 200)
238 }
239
240 func (s *v0Suite) TestSubscribe(c *check.C) {
241         conn, r, w := s.testClient()
242         defer conn.Close()
243
244         s.emitEvents(nil)
245
246         err := w.Encode(map[string]interface{}{"21": 12})
247         c.Check(err, check.IsNil)
248         s.expectStatus(c, r, 400)
249
250         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
251         c.Check(err, check.IsNil)
252         s.expectStatus(c, r, 200)
253
254         uuidChan := make(chan string, 1)
255         go s.emitEvents(uuidChan)
256         uuid := <-uuidChan
257
258         for _, etype := range []string{"create", "blip", "update"} {
259                 lg := s.expectLog(c, r)
260                 for lg.ObjectUUID != uuid {
261                         lg = s.expectLog(c, r)
262                 }
263                 c.Check(lg.EventType, check.Equals, etype)
264         }
265 }
266
267 // Generate some events by creating and updating a workflow object,
268 // and creating a custom log entry (event_type="blip") about the newly
269 // created workflow. If uuidChan is not nil, send the new workflow
270 // UUID to uuidChan as soon as it's known.
271 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
272         s.wg.Add(1)
273         defer s.wg.Done()
274
275         ac := arvados.NewClientFromEnv()
276         ac.AuthToken = s.token
277         wf := &arvados.Workflow{
278                 Name: "ws_test",
279         }
280         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
281         if err != nil {
282                 panic(err)
283         }
284         if uuidChan != nil {
285                 uuidChan <- wf.UUID
286         }
287         lg := &arvados.Log{}
288         err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
289                 ObjectUUID: wf.UUID,
290                 EventType:  "blip",
291                 Properties: map[string]interface{}{
292                         "beep": "boop",
293                 },
294         }), nil)
295         if err != nil {
296                 panic(err)
297         }
298         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
299         if err != nil {
300                 panic(err)
301         }
302         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
303 }
304
305 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
306         j, err := json.Marshal(ob)
307         if err != nil {
308                 panic(err)
309         }
310         v := url.Values{}
311         v[rscName] = []string{string(j)}
312         return bytes.NewBufferString(v.Encode())
313 }
314
315 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
316         msg := map[string]interface{}{}
317         c.Check(r.Decode(&msg), check.IsNil)
318         c.Check(int(msg["status"].(float64)), check.Equals, status)
319 }
320
321 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
322         lg := &arvados.Log{}
323         ok := make(chan struct{})
324         go func() {
325                 for lg.ID <= s.ignoreLogID {
326                         c.Check(r.Decode(lg), check.IsNil)
327                 }
328                 close(ok)
329         }()
330         select {
331         case <-time.After(10 * time.Second):
332                 panic("timed out")
333         case <-ok:
334                 return lg
335         }
336 }
337
338 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
339         srv := s.serverSuite.srv
340         conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
341         if err != nil {
342                 panic(err)
343         }
344         w := json.NewEncoder(conn)
345         r := json.NewDecoder(conn)
346         return conn, r, w
347 }
348
349 func (s *v0Suite) lastLogID(c *check.C) uint64 {
350         var lastID uint64
351         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
352         return lastID
353 }