21998: gofmt.
[arvados.git] / services / ws / session_v0_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "math/rand"
15         "net/url"
16         "os"
17         "strings"
18         "sync"
19         "sync/atomic"
20         "time"
21
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "golang.org/x/net/websocket"
26         check "gopkg.in/check.v1"
27 )
28
29 func init() {
30         if os.Getenv("ARVADOS_DEBUG") != "" {
31                 ctxlog.SetLevel("debug")
32         }
33 }
34
35 var _ = check.Suite(&v0Suite{})
36
37 type v0Suite struct {
38         serviceSuite serviceSuite
39         token        string
40         toDelete     []string
41         wg           sync.WaitGroup
42         ignoreLogID  int64
43 }
44
45 func (s *v0Suite) SetUpTest(c *check.C) {
46         s.serviceSuite.SetUpTest(c)
47         s.serviceSuite.start(c)
48
49         s.token = arvadostest.ActiveToken
50         s.ignoreLogID = s.lastLogID(c)
51 }
52
53 func (s *v0Suite) TearDownTest(c *check.C) {
54         s.wg.Wait()
55         s.serviceSuite.TearDownTest(c)
56 }
57
58 func (s *v0Suite) TearDownSuite(c *check.C) {
59         s.deleteTestObjects(c)
60 }
61
62 func (s *v0Suite) deleteTestObjects(c *check.C) {
63         ac := arvados.NewClientFromEnv()
64         ac.AuthToken = arvadostest.AdminToken
65         for _, path := range s.toDelete {
66                 err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
67                 c.Check(err, check.IsNil)
68         }
69         s.toDelete = nil
70 }
71
72 func (s *v0Suite) TestFilters(c *check.C) {
73         conn, r, w, err := s.testClient()
74         c.Assert(err, check.IsNil)
75         defer conn.Close()
76
77         cmd := func(method, eventType string, status int) {
78                 c.Check(w.Encode(map[string]interface{}{
79                         "method":  method,
80                         "filters": [][]interface{}{{"event_type", "in", []string{eventType}}},
81                 }), check.IsNil)
82                 s.expectStatus(c, r, status)
83         }
84         cmd("subscribe", "update", 200)
85         cmd("subscribe", "update", 200)
86         cmd("subscribe", "create", 200)
87         cmd("subscribe", "update", 200)
88         cmd("unsubscribe", "blip", 400)
89         cmd("unsubscribe", "create", 200)
90         cmd("unsubscribe", "update", 200)
91
92         go s.emitEvents(c, nil, nil)
93         lg := s.expectLog(c, r)
94         c.Check(lg.EventType, check.Equals, "update")
95
96         cmd("unsubscribe", "update", 200)
97         cmd("unsubscribe", "update", 200)
98         cmd("unsubscribe", "update", 400)
99 }
100
101 func (s *v0Suite) TestLastLogID(c *check.C) {
102         lastID := s.lastLogID(c)
103
104         checkLogs := func(r *json.Decoder, uuid string) {
105                 for _, etype := range []string{"create", "blip", "update"} {
106                         lg := s.expectLog(c, r)
107                         for lg.ObjectUUID != uuid {
108                                 lg = s.expectLog(c, r)
109                         }
110                         c.Check(lg.EventType, check.Equals, etype)
111                 }
112         }
113
114         // Connecting connEarly (before sending the early events) lets
115         // us confirm all of the "early" events have already passed
116         // through the server.
117         connEarly, rEarly, wEarly, err := s.testClient()
118         c.Assert(err, check.IsNil)
119         defer connEarly.Close()
120         c.Check(wEarly.Encode(map[string]interface{}{
121                 "method": "subscribe",
122         }), check.IsNil)
123         s.expectStatus(c, rEarly, 200)
124
125         // Send the early events.
126         uuidChan := make(chan string, 1)
127         s.emitEvents(c, uuidChan, nil)
128         uuidEarly := <-uuidChan
129
130         // Wait for the early events to pass through.
131         checkLogs(rEarly, uuidEarly)
132
133         // Connect the client that wants to get old events via
134         // last_log_id.
135         conn, r, w, err := s.testClient()
136         c.Assert(err, check.IsNil)
137         defer conn.Close()
138
139         c.Check(w.Encode(map[string]interface{}{
140                 "method":      "subscribe",
141                 "last_log_id": lastID,
142         }), check.IsNil)
143         s.expectStatus(c, r, 200)
144
145         checkLogs(r, uuidEarly)
146         s.emitEvents(c, uuidChan, nil)
147         checkLogs(r, <-uuidChan)
148 }
149
150 func (s *v0Suite) TestPermission(c *check.C) {
151         conn, r, w, err := s.testClient()
152         c.Assert(err, check.IsNil)
153         defer conn.Close()
154
155         c.Check(w.Encode(map[string]interface{}{
156                 "method": "subscribe",
157         }), check.IsNil)
158         s.expectStatus(c, r, 200)
159
160         uuidChan := make(chan string, 2)
161         go func() {
162                 s.token = arvadostest.AdminToken
163                 s.emitEvents(c, uuidChan, nil)
164                 s.token = arvadostest.ActiveToken
165                 s.emitEvents(c, uuidChan, nil)
166         }()
167
168         wrongUUID := <-uuidChan
169         rightUUID := <-uuidChan
170         lg := s.expectLog(c, r)
171         for lg.ObjectUUID != rightUUID {
172                 c.Check(lg.ObjectUUID, check.Not(check.Equals), wrongUUID)
173                 lg = s.expectLog(c, r)
174         }
175 }
176
177 // Two users create private objects; admin deletes both objects; each
178 // user receives a "delete" event for their own object (not for the
179 // other user's object).
180 func (s *v0Suite) TestEventTypeDelete(c *check.C) {
181         clients := []struct {
182                 token string
183                 uuid  string
184                 conn  *websocket.Conn
185                 r     *json.Decoder
186                 w     *json.Encoder
187         }{{token: arvadostest.ActiveToken}, {token: arvadostest.SpectatorToken}}
188         for i := range clients {
189                 uuidChan := make(chan string, 1)
190                 s.token = clients[i].token
191                 s.emitEvents(c, uuidChan, nil)
192                 clients[i].uuid = <-uuidChan
193
194                 var err error
195                 clients[i].conn, clients[i].r, clients[i].w, err = s.testClient()
196                 c.Assert(err, check.IsNil)
197
198                 c.Check(clients[i].w.Encode(map[string]interface{}{
199                         "method": "subscribe",
200                 }), check.IsNil)
201                 s.expectStatus(c, clients[i].r, 200)
202         }
203
204         s.ignoreLogID = s.lastLogID(c)
205         s.deleteTestObjects(c)
206
207         for _, client := range clients {
208                 lg := s.expectLog(c, client.r)
209                 c.Check(lg.ObjectUUID, check.Equals, client.uuid)
210                 c.Check(lg.EventType, check.Equals, "delete")
211         }
212 }
213
214 func (s *v0Suite) TestEventPropertiesFields(c *check.C) {
215         ac := arvados.NewClientFromEnv()
216         ac.AuthToken = s.token
217
218         conn, r, w, err := s.testClient()
219         c.Assert(err, check.IsNil)
220         defer conn.Close()
221
222         c.Check(w.Encode(map[string]interface{}{
223                 "method":  "subscribe",
224                 "filters": [][]string{{"object_uuid", "=", arvadostest.RunningContainerUUID}},
225         }), check.IsNil)
226         s.expectStatus(c, r, 200)
227
228         err = ac.RequestAndDecode(nil, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
229                 "object_uuid": arvadostest.RunningContainerUUID,
230                 "event_type":  "update",
231                 "properties": map[string]interface{}{
232                         "new_attributes": map[string]interface{}{
233                                 "name":                      "namevalue",
234                                 "requesting_container_uuid": "uuidvalue",
235                                 "state":                     "statevalue",
236                         },
237                 },
238         }), nil)
239         c.Assert(err, check.IsNil)
240
241         lg := s.expectLog(c, r)
242         c.Check(lg.ObjectUUID, check.Equals, arvadostest.RunningContainerUUID)
243         c.Check(lg.EventType, check.Equals, "update")
244         c.Check(lg.Properties["new_attributes"].(map[string]interface{})["requesting_container_uuid"], check.Equals, "uuidvalue")
245         c.Check(lg.Properties["new_attributes"].(map[string]interface{})["name"], check.Equals, "namevalue")
246         c.Check(lg.Properties["new_attributes"].(map[string]interface{})["state"], check.Equals, "statevalue")
247 }
248
249 // Trashing/deleting a collection produces an "update" event with
250 // properties["new_attributes"]["is_trashed"] == true.
251 func (s *v0Suite) TestTrashedCollection(c *check.C) {
252         ac := arvados.NewClientFromEnv()
253         ac.AuthToken = s.token
254
255         var coll arvados.Collection
256         err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", s.jsonBody("collection", `{"manifest_text":""}`), map[string]interface{}{"ensure_unique_name": true})
257         c.Assert(err, check.IsNil)
258         s.ignoreLogID = s.lastLogID(c)
259
260         conn, r, w, err := s.testClient()
261         c.Assert(err, check.IsNil)
262         defer conn.Close()
263
264         c.Check(w.Encode(map[string]interface{}{
265                 "method": "subscribe",
266         }), check.IsNil)
267         s.expectStatus(c, r, 200)
268
269         err = ac.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
270         c.Assert(err, check.IsNil)
271
272         lg := s.expectLog(c, r)
273         c.Check(lg.ObjectUUID, check.Equals, coll.UUID)
274         c.Check(lg.EventType, check.Equals, "update")
275         c.Check(lg.Properties["old_attributes"].(map[string]interface{})["is_trashed"], check.Equals, false)
276         c.Check(lg.Properties["new_attributes"].(map[string]interface{})["is_trashed"], check.Equals, true)
277 }
278
279 func (s *v0Suite) TestSendBadJSON(c *check.C) {
280         conn, r, w, err := s.testClient()
281         c.Assert(err, check.IsNil)
282         defer conn.Close()
283
284         c.Check(w.Encode(map[string]interface{}{
285                 "method": "subscribe",
286         }), check.IsNil)
287         s.expectStatus(c, r, 200)
288
289         _, err = fmt.Fprint(conn, "^]beep\n")
290         c.Check(err, check.IsNil)
291         s.expectStatus(c, r, 400)
292
293         c.Check(w.Encode(map[string]interface{}{
294                 "method": "subscribe",
295         }), check.IsNil)
296         s.expectStatus(c, r, 200)
297 }
298
299 func (s *v0Suite) TestSubscribe(c *check.C) {
300         conn, r, w, err := s.testClient()
301         c.Assert(err, check.IsNil)
302         defer conn.Close()
303
304         s.emitEvents(c, nil, nil)
305
306         err = w.Encode(map[string]interface{}{"21": 12})
307         c.Check(err, check.IsNil)
308         s.expectStatus(c, r, 400)
309
310         err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
311         c.Check(err, check.IsNil)
312         s.expectStatus(c, r, 200)
313
314         uuidChan := make(chan string, 1)
315         go s.emitEvents(c, uuidChan, nil)
316         uuid := <-uuidChan
317
318         for _, etype := range []string{"create", "blip", "update"} {
319                 lg := s.expectLog(c, r)
320                 for lg.ObjectUUID != uuid {
321                         lg = s.expectLog(c, r)
322                 }
323                 c.Check(lg.EventType, check.Equals, etype)
324         }
325 }
326
327 func (s *v0Suite) TestManyEventsAndSubscribers(c *check.C) {
328         // Frequent slow listener pings create the conditions for a
329         // deadlock issue with the lib/pq example listener usage.
330         //
331         // Specifically: a lib/pq/example/listen-style event loop can
332         // deadlock if enough (~32) server notifications arrive after
333         // the event loop decides to call Ping (e.g., while
334         // listener.Ping() is waiting for a response from the server,
335         // or in the time.Sleep() invoked by testSlowPing).
336         //
337         // (*ListenerConn)listenerConnLoop() doesn't see the server's
338         // ping response until it finishes sending a previous
339         // notification through its internal queue to
340         // (*Listener)listenerConnLoop(), which is blocked on sending
341         // to our Notify channel, which is blocked on waiting for the
342         // Ping response.
343         defer func(d time.Duration) {
344                 listenerPingInterval = d
345                 testSlowPing = false
346         }(listenerPingInterval)
347         listenerPingInterval = time.Second / 2
348         testSlowPing = true
349         // Restart the test server in order to get one that uses our
350         // test globals.
351         s.TearDownTest(c)
352         s.SetUpTest(c)
353
354         done := make(chan struct{})
355         defer close(done)
356         go s.emitEvents(c, nil, done)
357
358         // We will expect to receive at least one event during each
359         // one-second interval while the test is running.
360         t0 := time.Now()
361         seconds := 10
362         receivedPerSecond := make([]int64, seconds)
363
364         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(seconds)*time.Second))
365         defer cancel()
366         for clientID := 0; clientID < 100; clientID++ {
367                 clientID := clientID
368                 go func() {
369                         for ctx.Err() == nil {
370                                 conn, r, w, err := s.testClient()
371                                 if ctx.Err() != nil {
372                                         return
373                                 }
374                                 c.Assert(err, check.IsNil)
375                                 defer conn.Close()
376                                 err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
377                                 if ctx.Err() != nil {
378                                         return
379                                 }
380                                 c.Check(err, check.IsNil)
381                                 s.expectStatus(c, r, 200)
382                                 for {
383                                         if clientID%10 == 0 {
384                                                 // slow client
385                                                 time.Sleep(time.Second / 20)
386                                         } else if rand.Float64() < 0.01 {
387                                                 // disconnect+reconnect
388                                                 break
389                                         }
390                                         var lg arvados.Log
391                                         err := r.Decode(&lg)
392                                         if ctx.Err() != nil {
393                                                 return
394                                         }
395                                         if errors.Is(err, io.EOF) {
396                                                 break
397                                         }
398                                         c.Check(err, check.IsNil)
399                                         if i := int(time.Since(t0) / time.Second); i < seconds {
400                                                 atomic.AddInt64(&receivedPerSecond[i], 1)
401                                         }
402                                 }
403                                 conn.Close()
404                         }
405                 }()
406         }
407         <-ctx.Done()
408         c.Log("done")
409         for i, n := range receivedPerSecond {
410                 c.Logf("t<%d n=%d", i+1, n)
411                 c.Check(int64(n), check.Not(check.Equals), int64(0))
412         }
413 }
414
415 // Generate some events by creating and updating a workflow object,
416 // and creating a custom log entry (event_type="blip") about the newly
417 // created workflow.
418 //
419 // If uuidChan is not nil, send the new workflow UUID to uuidChan as
420 // soon as it's known.
421 //
422 // If done is not nil, keep generating events until done receives or
423 // closes.
424 func (s *v0Suite) emitEvents(c *check.C, uuidChan chan<- string, done <-chan struct{}) {
425         s.wg.Add(1)
426         defer s.wg.Done()
427
428         ac := arvados.NewClientFromEnv()
429         ac.AuthToken = s.token
430         wf := &arvados.Workflow{
431                 Name: "ws_test",
432         }
433         err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", `{"name":"ws_test"}`), map[string]interface{}{"ensure_unique_name": true})
434         c.Assert(err, check.IsNil)
435         s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID)
436         if uuidChan != nil {
437                 uuidChan <- wf.UUID
438         }
439         for i := 0; ; i++ {
440                 lg := &arvados.Log{}
441                 err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", map[string]interface{}{
442                         "object_uuid": wf.UUID,
443                         "event_type":  "blip",
444                         "properties": map[string]interface{}{
445                                 "beep": "boop",
446                         },
447                 }), nil)
448                 s.toDelete = append(s.toDelete, "arvados/v1/logs/"+lg.UUID)
449                 if done != nil {
450                         select {
451                         case <-done:
452                         default:
453                                 if i%50 == 0 {
454                                         time.Sleep(100 * time.Millisecond)
455                                 }
456                                 continue
457                         }
458                 }
459                 break
460         }
461         if err != nil {
462                 panic(err)
463         }
464         err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", `{"name":"ws_test"}`), nil)
465         if err != nil {
466                 panic(err)
467         }
468 }
469
470 func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
471         val, ok := ob.(string)
472         if !ok {
473                 j, err := json.Marshal(ob)
474                 if err != nil {
475                         panic(err)
476                 }
477                 val = string(j)
478         }
479         v := url.Values{}
480         v[rscName] = []string{val}
481         return bytes.NewBufferString(v.Encode())
482 }
483
484 func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
485         msg := map[string]interface{}{}
486         c.Check(r.Decode(&msg), check.IsNil)
487         c.Check(int(msg["status"].(float64)), check.Equals, status)
488 }
489
490 func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
491         lg := &arvados.Log{}
492         ok := make(chan struct{})
493         go func() {
494                 defer close(ok)
495                 for lg.ID <= s.ignoreLogID {
496                         c.Assert(r.Decode(lg), check.IsNil)
497                 }
498         }()
499         select {
500         case <-time.After(10 * time.Second):
501                 c.Error("timed out")
502                 c.FailNow()
503                 return lg
504         case <-ok:
505                 return lg
506         }
507 }
508
509 func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder, error) {
510         srv := s.serviceSuite.srv
511         conn, err := websocket.Dial(strings.Replace(srv.URL, "http", "ws", 1)+"/websocket?api_token="+s.token, "", srv.URL)
512         if err != nil {
513                 return nil, nil, nil, err
514         }
515         w := json.NewEncoder(conn)
516         r := json.NewDecoder(conn)
517         return conn, r, w, nil
518 }
519
520 func (s *v0Suite) lastLogID(c *check.C) int64 {
521         var lastID int64
522         c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
523         return lastID
524 }