Fix a few gofmt warnings.
[arvados.git] / services / ws / session_v0_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "io"
12         "net/url"
13         "os"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadostest"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "golang.org/x/net/websocket"
22         check "gopkg.in/check.v1"
23 )
24
25 func init() {
26         if os.Getenv("ARVADOS_DEBUG") != "" {
27                 ctxlog.SetLevel("debug")
28         }
29 }
30
31 var _ = check.Suite(&v0Suite{})
32
33 type v0Suite struct {
34         serviceSuite serviceSuite
35         token        string
36         toDelete     []string
37         wg           sync.WaitGroup
38         ignoreLogID  uint64
39 }
40
41 func (s *v0Suite) SetUpTest(c *check.C) {
42         s.serviceSuite.SetUpTest(c)
43         s.serviceSuite.start(c)
44
45         s.token = arvadostest.ActiveToken
46         s.ignoreLogID = s.lastLogID(c)
47 }
48
49 func (s *v0Suite) TearDownTest(c *check.C) {
50         s.wg.Wait()
51         s.serviceSuite.TearDownTest(c)
52 }
53
54 func (s *v0Suite) TearDownSuite(c *check.C) {
55         s.deleteTestObjects(c)
56 }
57
58 func (s *v0Suite) deleteTestObjects(c *check.C) {
59         ac := arvados.NewClientFromEnv()
60         ac.AuthToken = arvadostest.AdminToken
61         for _, path := range s.toDelete {
62                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
63                 if err != nil {
64                         panic(err)
65                 }
66         }
67         s.toDelete = nil
68 }
69
70 func (s *v0Suite) TestFilters(c *check.C) {
71         conn, r, w := s.testClient()
72         defer conn.Close()
73
74         cmd := func(method, eventType string, status int) {
75                 c.Check(w.Encode(map[string]interface{}{
76                         "method":  method,
77                         "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
78                 }), check.IsNil)
79                 s.expectStatus(c, r, status)
80         }
81         cmd("subscribe", "update", 200)
82         cmd("subscribe", "update", 200)
83         cmd("subscribe", "create", 200)
84         cmd("subscribe", "update", 200)
85         cmd("unsubscribe", "blip", 400)
86         cmd("unsubscribe", "create", 200)
87         cmd("unsubscribe", "update", 200)
88
89         go s.emitEvents(nil)
90         lg := s.expectLog(c, r)
91         c.Check(lg.EventType, check.Equals, "update")
92
93         cmd("unsubscribe", "update", 200)
94         cmd("unsubscribe", "update", 200)
95         cmd("unsubscribe", "update", 400)
96 }
97
98 func (s *v0Suite) TestLastLogID(c *check.C) {
99         lastID := s.lastLogID(c)
100
101         checkLogs := func(r *json.Decoder, uuid string) {
102                 for _, etype := range []string{"create", "blip", "update"} {
103                         lg := s.expectLog(c, r)
104                         for lg.ObjectUUID != uuid {
105                                 lg = s.expectLog(c, r)
106                         }
107                         c.Check(lg.EventType, check.Equals, etype)
108                 }
109         }
110
111         // Connecting connEarly (before sending the early events) lets
112         // us confirm all of the "early" events have already passed
113         // through the server.
114         connEarly, rEarly, wEarly := s.testClient()
115         defer connEarly.Close()
116         c.Check(wEarly.Encode(map[string]interface{}{
117                 "method": "subscribe",
118         }), check.IsNil)
119         s.expectStatus(c, rEarly, 200)
120
121         // Send the early events.
122         uuidChan := make(chan string, 1)
123         s.emitEvents(uuidChan)
124         uuidEarly := <-uuidChan
125
126         // Wait for the early events to pass through.
127         checkLogs(rEarly, uuidEarly)
128
129         // Connect the client that wants to get old events via
130         // last_log_id.
131         conn, r, w := s.testClient()
132         defer conn.Close()
133
134         c.Check(w.Encode(map[string]interface{}{
135                 "method":      "subscribe",
136                 "last_log_id": lastID,
137         }), check.IsNil)
138         s.expectStatus(c, r, 200)
139
140         checkLogs(r, uuidEarly)
141         s.emitEvents(uuidChan)
142         checkLogs(r, <-uuidChan)
143 }
144
145 func (s *v0Suite) TestPermission(c *check.C) {
146         conn, r, w := s.testClient()
147         defer conn.Close()
148
149         c.Check(w.Encode(map[string]interface{}{
150                 "method": "subscribe",
151         }), check.IsNil)
152         s.expectStatus(c, r, 200)
153
154         uuidChan := make(chan string, 2)
155         go func() {
156                 s.token = arvadostest.AdminToken
157                 s.emitEvents(uuidChan)
158                 s.token = arvadostest.ActiveToken
159                 s.emitEvents(uuidChan)
160         }()
161
162         wrongUUID := <-uuidChan
163         rightUUID := <-uuidChan
164         lg := s.expectLog(c, r)
165         for lg.ObjectUUID != rightUUID {
166                 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
167                 lg = s.expectLog(c, r)
168         }
169 }
170
171 // Two users create private objects; admin deletes both objects; each
172 // user receives a "delete" event for their own object (not for the
173 // other user's object).
174 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
175         clients := []struct {
176                 token string
177                 uuid  string
178                 conn  *websocket.Conn
179                 r     *json.Decoder
180                 w     *json.Encoder
181         }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
182         for i := range clients {
183                 uuidChan := make(chan string, 1)
184                 s.token = clients[i].token
185                 s.emitEvents(uuidChan)
186                 clients[i].uuid = <-uuidChan
187                 clients[i].conn, clients[i].r, clients[i].w = s.testClient()
188
189                 c.Check(clients[i].w.Encode(map[string]interface{}{
190                         "method": "subscribe",
191                 }), check.IsNil)
192                 s.expectStatus(c, clients[i].r, 200)
193         }
194
195         s.ignoreLogID = s.lastLogID(c)
196         s.deleteTestObjects(c)
197
198         for _, client := range clients {
199                 lg := s.expectLog(c, client.r)
200                 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
201                 c.Check(lg.EventType, check.Equals, "delete")
202         }
203 }
204
205 // Trashing/deleting a collection produces an "update" event with
206 // properties["new_attributes"]["is_trashed"] == true.
207 func (s *v0Suite) TestTrashedCollection(c *check.C) {
208         ac := arvados.NewClientFromEnv()
209         ac.AuthToken = s.token
210
211         var coll arvados.Collection
212         err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
213         c.Assert(err, check.IsNil)
214         s.ignoreLogID = s.lastLogID(c)
215
216         conn, r, w := s.testClient()
217         defer conn.Close()
218
219         c.Check(w.Encode(map[string]interface{}{
220                 "method": "subscribe",
221         }), check.IsNil)
222         s.expectStatus(c, r, 200)
223
224         err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
225         c.Assert(err, check.IsNil)
226
227         lg := s.expectLog(c, r)
228         c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
229         c.Check(lg.EventType, check.Equals, "update")
230         c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
231         c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
232 }
233
234 func (s *v0Suite) TestSendBadJSON(c *check.C) {
235         conn, r, w := s.testClient()
236         defer conn.Close()
237
238         c.Check(w.Encode(map[string]interface{}{
239                 "method": "subscribe",
240         }), check.IsNil)
241         s.expectStatus(c, r, 200)
242
243         _, err := fmt.Fprint(conn, "^]beep\n")
244         c.Check(err, check.IsNil)
245         s.expectStatus(c, r, 400)
246
247         c.Check(w.Encode(map[string]interface{}{
248                 "method": "subscribe",
249         }), check.IsNil)
250         s.expectStatus(c, r, 200)
251 }
252
253 func (s *v0Suite) TestSubscribe(c *check.C) {
254         conn, r, w := s.testClient()
255         defer conn.Close()
256
257         s.emitEvents(nil)
258
259         err := w.Encode(map[string]interface{}{"21": 12})
260         c.Check(err, check.IsNil)
261         s.expectStatus(c, r, 400)
262
263         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
264         c.Check(err, check.IsNil)
265         s.expectStatus(c, r, 200)
266
267         uuidChan := make(chan string, 1)
268         go s.emitEvents(uuidChan)
269         uuid := <-uuidChan
270
271         for _, etype := range []string{"create", "blip", "update"} {
272                 lg := s.expectLog(c, r)
273                 for lg.ObjectUUID != uuid {
274                         lg = s.expectLog(c, r)
275                 }
276                 c.Check(lg.EventType, check.Equals, etype)
277         }
278 }
279
280 // Generate some events by creating and updating a workflow object,
281 // and creating a custom log entry (event_type="blip") about the newly
282 // created workflow. If uuidChan is not nil, send the new workflow
283 // UUID to uuidChan as soon as it's known.
284 func (s *v0Suite) emitEvents(uuidChan chan<- string) {
285         s.wg.Add(1)
286         defer s.wg.Done()
287
288         ac := arvados.NewClientFromEnv()
289         ac.AuthToken = s.token
290         wf := &arvados.Workflow{
291                 Name: "ws_test",
292         }
293         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
294         if err != nil {
295                 panic(err)
296         }
297         if uuidChan != nil {
298                 uuidChan <- wf.UUID
299         }
300         lg := &arvados.Log{}
301         err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
302                 "object_uuid": wf.UUID,
303                 "event_type":  "blip",
304                 "properties": map[string]interface{}{
305                         "beep": "boop",
306                 },
307         }), nil)
308         if err != nil {
309                 panic(err)
310         }
311         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
312         if err != nil {
313                 panic(err)
314         }
315         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
316 }
317
318 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
319         val, ok := ob.(string)
320         if !ok {
321                 j, err := json.Marshal(ob)
322                 if err != nil {
323                         panic(err)
324                 }
325                 val = string(j)
326         }
327         v := url.Values{}
328         v[rscName] = []string{val}
329         return bytes.NewBufferString(v.Encode())
330 }
331
332 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
333         msg := map[string]interface{}{}
334         c.Check(r.Decode(&msg), check.IsNil)
335         c.Check(int(msg["status"].(float64)), check.Equals, status)
336 }
337
338 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
339         lg := &arvados.Log{}
340         ok := make(chan struct{})
341         go func() {
342                 for lg.ID <= s.ignoreLogID {
343                         c.Check(r.Decode(lg), check.IsNil)
344                 }
345                 close(ok)
346         }()
347         select {
348         case <-time.After(10 * time.Second):
349                 panic("timed out")
350         case <-ok:
351                 return lg
352         }
353 }
354
355 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
356         srv := s.serviceSuite.srv
357         conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
358         if err != nil {
359                 panic(err)
360         }
361         w := json.NewEncoder(conn)
362         r := json.NewDecoder(conn)
363         return conn, r, w
364 }
365
366 func (s *v0Suite) lastLogID(c *check.C) uint64 {
367         var lastID uint64
368         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
369         return lastID
370 }